Compare commits

...

7 Commits

Author SHA1 Message Date
Khalil Meftah 4af7095693 Merge branch 'main' into feat/rollout/dagger-episode-save 2026-07-03 16:50:10 +02:00
Pepijn e275ea3960 LingBot-VA: video-action world model (#3731)
* feat(policies): add LingBot-VA autoregressive video-action world model

Port the LingBot-VA policy (Wan2.2 dual-stream video+action world model) into
LeRobot, following the EO-1 / VLA-JEPA conventions. Covers inference, checkpoint
conversion, and predicted-video saving (training is deferred to a follow-up PR).

- Vendored Wan transformer/attention/flex/VAE/scheduler modules (key names preserved
  for near-identity conversion); torch SDPA default, flashattn/flex lazy-guarded.
- LingBotVAConfig (registered "lingbot_va") + processor with fixed-quantile action
  unnormalization; full dual-stream sampling loop with CFG, two flow-matching
  schedulers and KV cache, mapped onto select_action with observed-keyframe feedback.
- convert_lingbot_va_checkpoints.py (libero/robotwin variants): bundles the ~5B
  transformer, lazy-pulls the frozen VAE+UMT5 from the source repo.
- Predicted-video plumbing in lerobot_eval (predicted_frames_callback; opt-in via
  --policy.save_predicted_video) and ConstantWithWarmupSchedulerConfig.
- pyproject: widen diffusers-dep to <0.37, add lingbot_va + imageio-dep extras,
  add lingbot_va and (missing) eo1 to `all`.
- Factory + policies/__init__ wiring, docs page + toctree, and tests.

Note: the LIBERO success-rate correctness gate must be validated on a CUDA GPU
with the converted checkpoint.

* feat(lingbot_va): RoboTwin eef-pose eval, single-file model, Hub checkpoints

Make the LingBot-VA port runnable on both LIBERO and RoboTwin and clean up the
package to LeRobot conventions.

- Consolidate all vendored Wan2.2 model code (transformer, attention, VAE helpers,
  flow-matching scheduler, grid utils, flex-attention) into a single
  modeling_lingbot_va.py; remove the separate wan_*/schedulers modules.
- Move the fixed action (un)normalization quantiles out of the config and into the
  post-processor (LIBERO 7-DoF + RoboTwin 16-d eef); remove the conversion script in
  favour of ready-to-use LeRobot-format checkpoints on the Hub.
- Fixes found via on-sim validation: undo LIBERO's 180-degree image flip
  (image_hflip), encode obs as a multi-frame streaming-VAE clip, reset the streaming
  VAE cache between episodes, run the transformer in config.dtype, lazy-load frozen
  VAE/UMT5 by subfolder with the text encoder on CPU.
- RoboTwin: add an end-effector-pose action mode to RoboTwinEnv (16-d per-arm
  xyz+quat+gripper deltas composed onto the initial eef pose, executed via CuRobo IK)
  and the robotwin_tshape latent layout (full-res head + half-res wrists via a second
  streaming VAE) with the upstream RoboTwin action quantiles + camera mapping.
- Predicted-video saving works for both benchmarks; docs + tests updated.

* feat(lingbot_va): implement training / fine-tuning (flow-matching loss)

- Implement LingBotVAPolicy.forward(): dual-stream flow-matching training loss
  (latent + action, timestep-weighted, action-masked) ported from upstream train.py;
  VAE-encodes camera clips, UMT5-encodes the task, noises both streams, runs the
  block-causal flex-attention training pass (forward_train).
- training_loss_from_streams() core + _build_training_streams() data prep (action
  scatter into the 30-d space, multi-frame VAE encode incl. robotwin_tshape).
- get_optim_params returns only trainable transformer params (LoRA/PEFT friendly);
  VAE/UMT5 stay frozen. Training needs attn_mode='flex'.
- Add a tiny-config single-training-step test (forward->loss->backward->AdamW) and a
  Training/fine-tuning section in the docs.

* fix(lingbot_va): CI quality gate + fast-test collection

- Add tests/policies/lingbot_va/__init__.py so the test files don't clash by basename
  with tests/policies/vla_jepa/* under pytest's default import mode (fast-test collection error).
- Fix vendored typos flagged by the typos hook (pach_scale->patch_scale, total_tolen->
  total_token_len, stablized->stabilized) and a mypy union-attr in RoboTwinEnv._read_eef_pose.
- Apply Prettier formatting to docs/source/lingbot_va.mdx.

* docs(lingbot_va): document EEF action-channel schema + camera order

* Update lingbot_va.mdx

Signed-off-by: Pepijn <138571049+pkooij@users.noreply.github.com>

* Update pyproject.toml

Signed-off-by: Pepijn <138571049+pkooij@users.noreply.github.com>

* Update pyproject.toml

Signed-off-by: Pepijn <138571049+pkooij@users.noreply.github.com>

* refactor(lingbot_va): drop hardcoded action quantiles; source from checkpoint

The LIBERO/RoboTwin action (un)normalization quantiles were hardcoded as module
constants in processor_lingbot_va.py. They are already serialized into each
checkpoint's policy_postprocessor.json (via LingBotVAActionUnnormalizeStep.get_config)
and restored on load by PolicyProcessorPipeline.from_pretrained, so the constants are
dead at eval/load time for the released checkpoints (verified: libero_long/robotwin/base
all carry their quantiles on the Hub).

- Remove LIBERO_ACTION_Q01/Q99, ROBOTWIN_ACTION_Q01/Q99 and _default_action_quantiles.
- make_lingbot_va_pre_post_processors now defaults a fresh (unconverted) build to a
  neutral [-1, 1] mapping (identity rescale); real per-benchmark stats come from the
  saved checkpoint (or postprocessor_overrides), analogous to dataset-stats normalization.
- Update the config doc comment to point at the checkpoint as the source of truth.
- Tests: replace the LIBERO-default assertion with a neutral-default check, and add a
  save_pretrained/from_pretrained round-trip guard for the quantile serialization.

* docs(lingbot_va): trim verbose comments

- configuration_lingbot_va.py: condense multi-line field comments to one-liners
  (keep the ── section headers).
- processor_lingbot_va.py: shorten the action-quantile explanation block.
- modeling_lingbot_va.py: drop the bare "# ----" separator rules, keeping the
  one-line section headers.

No code changes.

* docs(lingbot_va): trim provenance comments; default wan path to base repo

- configuration_lingbot_va.py: drop the "──" decorations and the
  "(from transformer/config.json)" note; default wan_pretrained_path to
  robbyant/lingbot-va-base (has the frozen vae/text_encoder/tokenizer subfolders).
- modeling_lingbot_va.py: remove the vendored-code banner and the
  "(upstream wan_va/...)" section-header provenance/dash decorations; condense the
  transformer-dtype comment to one line.

No code changes.

* refactor(lingbot_va): use built-in UnnormalizerProcessorStep for actions

Replace the bespoke LingBotVAActionUnnormalizeStep with the standard
UnnormalizerProcessorStep in QUANTILES mode, which computes the identical
(action + 1) / 2 * (q99 - q01) + q01 mapping. The per-channel q01/q99 are stored
as the step's saved state (a safetensors file) and restored on load; a fresh build
has no action stats so the step is an identity passthrough.

The 3 Hub checkpoints (lerobot/lingbot_va_{libero_long,robotwin,base}) have been
re-uploaded with the new post-processor (policy_postprocessor.json +
*_unnormalizer_processor.safetensors); reloading from the Hub round-trips q01/q99.

- processor_lingbot_va.py: drop the custom step + registry; build the post-processor
  with UnnormalizerProcessorStep (explicit ACTION->QUANTILES norm_map so the
  preprocessor / training path is unchanged).
- tests: assert the built-in step is used, identity-when-no-stats, correct quantile
  unnormalization, and a save_pretrained/from_pretrained stats round-trip.

* docs(lingbot_va): point checkpoint paths at the lerobot org

The LeRobot-format checkpoints moved from pepijn223/* to lerobot/* (libero_long,
robotwin, base). Update the eval/train --policy.path examples accordingly.

* docs(lingbot_va): condense processor normalization comments

* fix(lingbot-va): align RoboTwin evaluation (#3784)

Thank you for the RoboTwin fix, and alignment!

* applying fixes

* updating uv lock and linting

* adjusting test to match expected values

* cleaning up deps

* cleaning up top level imports, styling, and deps guards

* cleanup
* moving wan utils and loading utils to `utils.py`
* removing ftfy by replicating the prompt_clean function without it (we don't expect to have weird chars given in the prompt anyway)

* removing unused function

* guarding for scipy dep, renaming test to avoid collision

* adding back accelerate for peak memory usage optim + justifying robotwin description dep

---------

Signed-off-by: Pepijn <138571049+pkooij@users.noreply.github.com>
Co-authored-by: pepijn223 <pepijn223@hf.co>
Co-authored-by: Gangwei XU <gwxu@hust.edu.cn>
Co-authored-by: Maxime Ellerbach <maxime.ellerbach@huggingface.co>
2026-07-03 13:32:38 +02:00
Nikodem Bartnik 911734ec9c Docs/improve HF jobs documentation (#3909)
* improve hf jobs docs

* Update docs/source/hardware_guide.mdx

Co-authored-by: Nicolas Rabault <rabault.nicolas@gmail.com>
Signed-off-by: Nikodem Bartnik <39432165+NikodemBartnik@users.noreply.github.com>

---------

Signed-off-by: Nikodem Bartnik <39432165+NikodemBartnik@users.noreply.github.com>
Co-authored-by: Nicolas Rabault <rabault.nicolas@gmail.com>
2026-07-03 11:39:16 +02:00
Khalil Meftah 46d4ddc698 chore(rollout): log episode success label and buffer length 2026-07-02 19:12:10 +02:00
Khalil Meftah b29ba27977 fix(rollout): guard empty buffer save 2026-07-02 18:02:59 +02:00
Khalil Meftah 599e2432e5 fix(rollout): clear last_action after return_to_initial 2026-07-02 18:02:36 +02:00
Khalil Meftah 44f76dbbf0 feat(rollout): add episode success/failure labeling to DAgger strategy
Enable operators to mark episodes as success or failure during DAgger
data collection. Pressing 's' or 'f' immediately saves the episode
with the appropriate label and returns the robot to its initial position.

- Add success/failure key bindings to DAggerKeyboardConfig
- Add save_episode_requested event and episode_success state to DAggerEvents
- Stamp next.success=True on terminal frame for successful episodes
- Pause and return to initial position after manual save for env reset
- Add num_episodes target to stop continuous recording automatically
- Defer save during corrections to avoid splitting mid-intervention
2026-07-02 17:48:02 +02:00
26 changed files with 3247 additions and 122 deletions
+4
View File
@@ -22,6 +22,10 @@ outputs
rl
media
# Local virtualenvs (the image provides its own)
.venv
venv
# Logging
logs
+2
View File
@@ -69,6 +69,8 @@
title: VLA-JEPA
- local: eo1
title: EO-1
- local: lingbot_va
title: LingBot-VA
- local: fastwam
title: FastWAM
- local: groot
+9 -9
View File
@@ -82,18 +82,18 @@ VRAM is the first filter. Within a tier, pick by budget and availability — the
### Hugging Face Jobs
[Hugging Face Jobs](https://huggingface.co/docs/hub/jobs) lets you run training on managed HF infrastructure, billed by the second. The repo publishes a ready-to-use image: **`huggingface/lerobot-gpu:latest`**, rebuilt **every night at 02:00 UTC from `main`** ([`docker_publish.yml`](https://github.com/huggingface/lerobot/blob/main/.github/workflows/docker_publish.yml)) — so it tracks the current state of the repo, not a tagged release.
[Hugging Face Jobs](https://huggingface.co/docs/hub/jobs) lets you run training on managed HF infrastructure, billed by the second, without owning a GPU. `lerobot-train` submits and streams the job for you — just add `--job.target=<flavor>` to a normal training command:
```bash
hf jobs run --flavor a10g-large huggingface/lerobot-gpu:latest \
bash -c "nvidia-smi && lerobot-train \
--policy.type=act --dataset.repo_id=<USER>/<DATASET> \
--policy.repo_id=<USER>/act_<task> --batch_size=8 --steps=50000"
lerobot-train \
--policy.type=act --dataset.repo_id=<USER>/<DATASET> \
--policy.repo_id=<USER>/act_<task> \
--job.target=a10g-large
```
Notes:
- The leading `nvidia-smi` is a quick sanity check that CUDA is visible inside the container — useful to fail fast if the flavor or driver mismatched.
- The default Job timeout is 30 minutes; pass `--timeout 4h` (or longer) for real training.
- `--flavor` maps onto the table above: `t4-small`/`t4-medium` (T4, ACT only), `l4x1`/`l4x4` (L4 24 GB), `a10g-small/large/largex2/largex4` (A10G 24 GB scaled out), `a100-large` (A100). For the current full catalogue + pricing see [https://huggingface.co/docs/hub/jobs](https://huggingface.co/docs/hub/jobs).
- Prefer not to write the `hf jobs run` wrapper yourself? `lerobot-train` can submit the job for you: just add `--job.target=<flavor>` to a normal training command and it handles dataset upload, log streaming, and the final model push. See the [imitation-learning training guide](./il_robots).
- Run `hf auth login` once before submitting, the job runs under your token.
- `--job.target` maps onto the table above: `t4-small`/`t4-medium` (T4, ACT only), `l4x1`/`l4x4` (L4 24 GB), `a10g-small/large/largex2/largex4` (A10G 24 GB scaled out), `a100-large` (A100). List the current catalogue with pricing via `hf jobs hardware`, or see [https://huggingface.co/docs/hub/jobs](https://huggingface.co/docs/hub/jobs).
- The job defaults to a `2d` (48h) timeout. Override it with `--job.timeout=4h` (or any other valid duration string) to shorten or extend the timeout. The job automatically stops when the command completes.
- For the full walkthrough — dataset upload, checkpoint streaming, resuming a run on a job — see the [imitation-learning training guide](./il_robots#train-using-hugging-face-jobs).
+1 -78
View File
@@ -532,84 +532,7 @@ If your local computer doesn't have a powerful GPU you could utilize Google Cola
Hugging Face jobs let's you easily select hardware and run the training in the cloud. So if you don't have a powerful GPU or you need more VRAM or just want to train a model much faster use HF Jobs! It's pay as you go and you simply pay for each second of use, you can see the pricing and additional information [here](https://huggingface.co/docs/hub/jobs).
> **Tip:** if you just want to launch a standard training run, you can skip building the command below and use the integrated **Train on HF Jobs via `--job.target`** flow described further down — `lerobot-train` then submits the job, uploads a local-only dataset for you, and streams the logs.
To run the training manually use this command:
<hfoptions id="train_with_hf_jobs">
<hfoption id="Command">
```bash
hf jobs run \
--flavor a10g-small \
--timeout 4h \
--secrets HF_TOKEN \
huggingface/lerobot-gpu:latest \
-- \
python -m lerobot.scripts.lerobot_train \
--dataset.repo_id=username/dataset \
--policy.type=act \
--steps=5000 \
--batch_size=16 \
--policy.device=cuda \
--policy.repo_id=username/your_policy \
--log_freq=100
```
</hfoption>
<hfoption id="API example">
<!-- prettier-ignore-start -->
```python
from huggingface_hub import run_job, get_token
run_name = "act_so101_hf_jobs"
dataset_id = "username/dataset"
user_hub_id = "username"
command_args = [
"python", "-m", "lerobot.scripts.lerobot_train",
"--dataset.repo_id", dataset_id,
"--policy.type", "act",
"--steps", "5000",
"--batch_size", "16",
"--num_workers", "4",
"--policy.device", "cuda",
"--log_freq", "100",
"--save_freq", "1000",
"--save_checkpoint", "true",
"--wandb.enable", "false",
"--policy.repo_id", f"{user_hub_id}/{run_name}"
]
print(f"Submitting job '{run_name}' to Hugging Face Infrastructure...")
job_info = run_job(
image="huggingface/lerobot-gpu:latest",
command=command_args,
flavor="a10g-small",
timeout="4h",
secrets={"HF_TOKEN": get_token()}
)
print("\n🚀 Job successfully launched!")
print(f"🔹 Job ID: {job_info.id}")
print(f"🔗 Live UI Dashboard & Logs: {job_info.url}")
```
<!-- prettier-ignore-end -->
</hfoption>
</hfoptions>
You can modify the `--flavor` to use different hardware, for example: `t4-small`, `a100-large`, `h200`. Use `hf jobs hardware` to see the full list with pricing.
Depending on the model you want to train and the hardware you selected you can also modify the `--batch_size` and `--number_of_workers`.
For longer training sessions increase the timeout.
Once the training is started you can go to [Jobs](https://huggingface.co/settings/jobs) and see if your jobs is running as well as all the outputs. Sometimes it takes a few minutes to schedule your job so be patient.
After training the model will be pushed to hub and you can use it as any other model with LeRobot.
#### Train on HF Jobs via `--job.target` (integrated CLI)
`lerobot-train` runs locally by default. To run on a HuggingFace GPU without constructing the Docker command yourself, pass `--job.target` with a hardware flavor name:
`lerobot-train` runs locally by default. To run on a HuggingFace GPU, pass `--job.target` with a hardware flavor name:
```bash
lerobot-train \
+187
View File
@@ -0,0 +1,187 @@
# LingBot-VA
LingBot-VA is an **autoregressive video-action world-model policy** built on the **Wan2.2**
video-diffusion stack. It interleaves, in one autoregressive sequence, the prediction of
future **video latents** and **robot actions** ("VA" = Video-Action). The LeRobot
integration wires LingBot-VA into the standard training, evaluation and processor
interfaces.
## Model Overview
LingBot-VA is a **dual-stream "mixture-of-transformers"**: a video/latent stream
(`patch_embedding_mlp → blocks → proj_out`) and an action stream
(`action_embedder → blocks → action_proj_out`) share the same 30 transformer blocks and
text conditioning.
| Component | Class | Role |
| ------------------------ | ----------------------- | ----------------------------------------------------------- |
| DiT backbone (trainable) | `WanTransformer3DModel` | ~5B-param dual-stream transformer. |
| VAE (frozen) | `AutoencoderKLWan` | Wan2.2 VAE, `z_dim=48`. Lazy-pulled from the source repo. |
| Text encoder (frozen) | `UMT5EncoderModel` | UMT5-XXL, `d_model=4096`. Lazy-pulled from the source repo. |
At inference the policy runs an autoregressive loop per chunk: it denoises the video-latent
stream (CFG, ~20 steps) and the action stream (~50 steps) with two independent
flow-matching schedulers, maintaining a KV cache across chunks. Real observed keyframes are
fed back into the KV cache as the chunk is executed (closed-loop world modeling).
### What the LeRobot Integration Covers
- Standard `policy.type=lingbot_va` configuration through LeRobot.
- Ready-to-use LeRobot-format checkpoints on the Hub (converted from the released upstream ones).
- Autoregressive dual-stream inference behind the standard `select_action` interface
(single-environment eval, `--eval.batch_size=1`).
- Opt-in saving of the policy's **predicted (imagined) videos** during eval / training.
- Evaluation with `lerobot-eval` on LIBERO and RoboTwin.
- Training / fine-tuning via the dual-stream flow-matching loss (`policy.forward`), see below.
## Installation
1. Install LeRobot by following the [Installation Guide](./installation).
2. Install the LingBot-VA extra:
```bash
pip install -e ".[lingbot_va]"
```
## Checkpoints
The released upstream checkpoints have been converted to LeRobot format and pushed to the Hub:
| Variant | LeRobot checkpoint |
| ---------------------- | -------------------------------- |
| LIBERO-Long post-train | `lerobot/lingbot_va_libero_long` |
| RoboTwin post-train | `lerobot/lingbot_va_robotwin` |
| Pretrained base | `lerobot/lingbot_va_base` |
Only the trainable ~5B transformer is stored in the LeRobot
`model.safetensors`. The frozen VAE + UMT5 + tokenizer (~20 GB) are pulled from
`config.wan_pretrained_path` at load time (defaults to the source `robbyant/*` repo). The
UMT5-XXL text encoder runs on CPU by default (`config.text_encoder_device`) so the 5B
transformer + VAE fit on a single 2432 GB GPU.
## Evaluation (LIBERO)
```bash
lerobot-eval \
--policy.path=lerobot/lingbot_va_libero_long \
--policy.device=cuda \
--env.type=libero --env.task=libero_10 \
--env.observation_height=128 --env.observation_width=128 \
--eval.n_episodes=50 --eval.batch_size=1 \
--output_dir=outputs/eval/lingbot_va_libero
```
LingBot-VA's streaming inference (KV cache + observed-keyframe feedback) is implemented for
single-environment eval; use `--eval.batch_size=1`.
## Evaluation (RoboTwin)
RoboTwin 2.0 needs the SAPIEN + CuRobo simulator stack. You can use the benchmark Docker image
(`docker/Dockerfile.benchmark.robotwin`, which also needs `warp-lang==1.3.1` and CuRobo built
with the GPU's compute capability in `TORCH_CUDA_ARCH_LIST`). RoboTwin uses **end-effector-pose
control**, so run with `--env.action_mode=ee`: the policy predicts per-arm `xyz+quaternion+gripper`
deltas (`robotwin_tshape` latent layout) that are composed onto the episode's initial eef pose and
executed via CuRobo IK.
```bash
lerobot-eval \
--policy.path=lerobot/lingbot_va_robotwin \
--policy.device=cuda \
--env.type=robotwin --env.task=beat_block_hammer --env.action_mode=ee \
--eval.n_episodes=10 --eval.batch_size=1 \
--output_dir=outputs/eval/lingbot_va_robotwin
```
### Saving predicted (imagined) videos
Set `--policy.save_predicted_video=true` to additionally VAE-decode the predicted video
latents and write `pred_episode_*.mp4` next to the env-rendered `eval_episode_*.mp4` videos.
The same flag works for the periodic eval during `lerobot-train`.
## Training / fine-tuning
`LingBotVAPolicy.forward(batch)` implements the dual-stream **flow-matching** loss
(`latent_loss + action_loss`, timestep-weighted, action-masked) from the paper: it VAE-encodes
the camera clips into video latents, UMT5-encodes the task, noises both streams, runs the
transformer's block-causal training pass and returns `(loss, metrics)`. Optimizer preset is AdamW
with a linear-warmup-then-constant schedule (matching upstream).
Requirements:
- The block-causal masks use PyTorch **flex-attention**, so build the policy with
`--policy.attn_mode=flex` for training (the default `torch` SDPA is inference-only).
- The full 5B DiT does not fit a single 2432 GB GPU under AdamW; fine-tune with **LoRA**
(`--policy.use_peft=true`) and/or optimizer offload. `get_optim_params` returns only the
trainable (e.g. adapter) parameters; the VAE + UMT5 text encoder stay frozen.
```bash
lerobot-train \
--policy.path=lerobot/lingbot_va_libero_long --policy.attn_mode=flex \
--policy.use_peft=true \
--dataset.repo_id=<your LeRobot-format dataset> \
--batch_size=1 --steps=... --output_dir=outputs/train/lingbot_va
```
The dataset must provide camera clips (a temporal window per camera, VAE-encoded to
`frame_chunk_size` latent frames) and `frame_chunk_size * action_per_frame` action steps per item.
## Data format (action channels & camera order)
LingBot-VA is an **end-effector (Cartesian) pose** policy, it predicts EEF poses + gripper, not
joint positions. Actions live in a fixed multi-embodiment **30-dim** layout; map your robot's
action dimensions into these channels and pad the rest with `0` (`used_action_channel_ids` selects
the channels a given checkpoint actually uses):
| channels | meaning |
| -------- | ----------------------------------------------------- |
| 06 | Left-arm end-effector pose |
| 713 | Right-arm end-effector pose |
| 1420 | Left-arm joints (unused by the released checkpoints) |
| 2127 | Right-arm joints (unused by the released checkpoints) |
| 28 | Left gripper |
| 29 | Right gripper |
- **LIBERO** uses channels `06`: a 6-DoF EEF delta (xyz + rotation) + gripper (single arm).
- **RoboTwin** uses channels `[06, 28, 713, 29]`: left EEF (xyz + quaternion) + left gripper +
right EEF + right gripper (16 dims). The env converts these poses to joint trajectories via
CuRobo IK — joints are never predicted.
Joint-space datasets (or a different EEF convention) must be remapped into this schema before
fine-tuning these checkpoints.
**Camera order is fixed and order-sensitive**, per-camera latents are concatenated spatially in
`obs_cam_keys` order, so the physical camera→slot mapping must match training:
| benchmark | `obs_cam_keys` (in order) | `camera_layout` |
| --------- | ----------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------- |
| LIBERO | `observation.images.image` (agentview / 3rd-person), `observation.images.image2` (eye-in-hand wrist) | `width_concat` (latents concatenated on width) |
| RoboTwin | `observation.images.head_camera`, `observation.images.left_camera`, `observation.images.right_camera` | `robotwin_tshape` (full-res head below, two half-res wrists on top) |
The first camera is the exterior/head view and the rest are wrist views.
## Inference Hyperparameters (LIBERO)
| Key | Value |
| -------------------------------------- | --------------------------------------------------------------------------------- |
| height × width | 128 × 128 |
| cameras | `observation.images.image` (agentview), `observation.images.image2` (eye-in-hand) |
| action channels used | 06 (7-DoF arm + gripper) |
| action_per_frame / frame_chunk_size | 4 / 4 |
| attn_window | 30 |
| video / action denoising steps | 20 / 50 |
| guidance_scale / action_guidance_scale | 5 / 1 |
| snr_shift / action_snr_shift | 5.0 / 0.05 |
These are the defaults of `LingBotVAConfig`; override any of them via `--policy.<name>=...`.
## Notes
- **Attention backend:** inference uses the `torch` SDPA backend (always available). The
`flashattn` and `flex` backends are optional; `flex` is only needed for training.
- **Model size:** the DiT is ~5B params and the frozen VAE+UMT5 add ~20 GB; inference needs
roughly 1824 GB of VRAM.
## License
LingBot-VA is released under Apache-2.0. See the
[upstream repository](https://github.com/Robbyant/lingbot-va).
+2
View File
@@ -236,6 +236,7 @@ fastwam = [
]
hilserl = ["lerobot[transformers-dep]", "lerobot[dataset]", "gym-hil>=0.1.14,<0.2.0", "lerobot[grpcio-dep]", "lerobot[placo-dep]"]
vla_jepa = ["lerobot[transformers-dep]", "lerobot[diffusers-dep]", "lerobot[qwen-vl-utils-dep]"]
lingbot_va = ["lerobot[transformers-dep]", "lerobot[diffusers-dep]", "lerobot[accelerate-dep]"]
# Features
async = ["lerobot[grpcio-dep]", "lerobot[matplotlib-dep]"]
@@ -318,6 +319,7 @@ all = [
"lerobot[xvla]",
"lerobot[hilserl]",
"lerobot[vla_jepa]",
"lerobot[lingbot_va]",
"lerobot[async]",
"lerobot[dev]",
"lerobot[test]",
+7 -1
View File
@@ -757,7 +757,7 @@ class RoboTwinEnvConfig(EnvConfig):
task: str = "beat_block_hammer" # single task or comma-separated list
fps: int = 25
episode_length: int = 300
episode_length: int = 1200
obs_type: str = "pixels_agent_pos"
render_mode: str = "rgb_array"
# Available cameras from RoboTwin's aloha-agilex embodiment: head_camera
@@ -768,6 +768,9 @@ class RoboTwinEnvConfig(EnvConfig):
# must equal what SAPIEN actually renders.
observation_height: int = 240
observation_width: int = 320
# "joint": 14-d joint-space control. "ee": 16-d end-effector-pose deltas executed via CuRobo IK
# (for world-model policies like LingBot-VA that predict per-arm xyz+quaternion+gripper poses).
action_mode: str = "joint"
features: dict[str, PolicyFeature] = field(
default_factory=lambda: {
ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(14,)),
@@ -784,6 +787,8 @@ class RoboTwinEnvConfig(EnvConfig):
)
def __post_init__(self):
if self.action_mode == "ee":
self.features[ACTION] = PolicyFeature(type=FeatureType.ACTION, shape=(16,))
cam_list = [c.strip() for c in self.camera_names.split(",") if c.strip()]
for cam in cam_list:
self.features[f"pixels/{cam}"] = PolicyFeature(
@@ -826,6 +831,7 @@ class RoboTwinEnvConfig(EnvConfig):
observation_height=self.observation_height,
observation_width=self.observation_width,
episode_length=self.episode_length,
action_mode=self.action_mode,
)
+169 -6
View File
@@ -17,6 +17,7 @@ from __future__ import annotations
import importlib
import logging
import os
from collections import defaultdict
from collections.abc import Callable, Sequence
from functools import partial
@@ -28,9 +29,17 @@ import torch
from gymnasium import spaces
from lerobot.types import RobotObservation
from lerobot.utils.import_utils import _scipy_available
from .utils import _LazyAsyncVectorEnv
# scipy is only used for end-effector-pose composition (``--env.action_mode=ee``); guard it so this
# module (and its base-env unit tests, which mock the RoboTwin runtime) imports without scipy installed.
if _scipy_available:
from scipy.spatial.transform import Rotation
else:
Rotation = None
logger = logging.getLogger(__name__)
# Camera names as used by RoboTwin 2.0. The wrapper appends "_rgb" when looking
@@ -41,10 +50,124 @@ ROBOTWIN_CAMERA_NAMES: tuple[str, ...] = (
"right_camera",
)
ACTION_DIM = 14 # 7 DOF × 2 arms
ACTION_DIM = 14 # 7 DOF × 2 arms (joint-space control mode)
# End-effector-pose control mode: per arm [x, y, z, qx, qy, qz, qw, gripper] = 8, dual-arm = 16.
# Used by world-model policies (e.g. LingBot-VA) that predict eef-pose deltas executed via CuRobo IK.
EEF_ACTION_DIM = 16
ACTION_LOW = -1.0
ACTION_HIGH = 1.0
DEFAULT_EPISODE_LENGTH = 300
DEFAULT_EPISODE_LENGTH = 1200
OFFICIAL_INSTRUCTION_ENV = "LEROBOT_ROBOTWIN_OFFICIAL_INSTRUCTION"
OFFICIAL_INSTRUCTION_TYPE_ENV = "LEROBOT_ROBOTWIN_INSTRUCTION_TYPE"
OFFICIAL_INSTRUCTION_MAX_ENV = "LEROBOT_ROBOTWIN_INSTRUCTION_MAX"
def _compose_eef_pose(new_pose: np.ndarray, init_pose: np.ndarray) -> np.ndarray:
"""Compose a single-arm predicted delta pose onto the initial pose.
``new_pose`` / ``init_pose`` are 8-vectors ``[x, y, z, qx, qy, qz, qw, gripper]``. Translation
is added, rotation is composed (``init_R * new_R``), and the gripper is taken from the
prediction. Mirrors ``add_eef_pose`` in the upstream LingBot-VA RoboTwin client.
"""
new_r = Rotation.from_quat(new_pose[3:7])
init_r = Rotation.from_quat(init_pose[3:7])
out_rot = (init_r * new_r).as_quat()
out_trans = new_pose[:3] + init_pose[:3]
return np.concatenate([out_trans, out_rot, new_pose[7:8]])
def _add_init_eef_pose(delta_pose: np.ndarray, init_pose: np.ndarray) -> np.ndarray:
"""Compose a dual-arm (16-d) predicted delta pose onto the initial eef pose, normalizing quats."""
left = _compose_eef_pose(delta_pose[:8], init_pose[:8])
right = _compose_eef_pose(delta_pose[8:], init_pose[8:])
out = np.concatenate([left, right])
# Normalize the two quaternions (indices 3:7 and 11:15) as the upstream client does.
out[3:7] = out[3:7] / (np.linalg.norm(out[3:7]) + 1e-8)
out[11:15] = out[11:15] / (np.linalg.norm(out[11:15]) + 1e-8)
return out
def _env_flag(name: str, default: bool = False) -> bool:
raw = os.environ.get(name)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def _arm_for_block(block: Any) -> str:
return "left" if float(block.get_pose().p[0]) < 0 else "right"
def _robotwin_blocks_episode_info(task_name: str, env: Any) -> dict[str, str] | None:
"""Infer the episode-info dict used by RoboTwin's official instruction generator for block ranking."""
if task_name == "blocks_ranking_rgb":
return {
"{A}": "red block",
"{B}": "green block",
"{C}": "blue block",
"{a}": _arm_for_block(env.block1),
"{b}": _arm_for_block(env.block2),
"{c}": _arm_for_block(env.block3),
}
if task_name == "blocks_ranking_size":
return {
"{A}": "large block",
"{B}": "medium block",
"{C}": "small block",
"{a}": _arm_for_block(env.block1),
"{b}": _arm_for_block(env.block2),
"{c}": _arm_for_block(env.block3),
}
return None
def _generate_robotwin_official_instruction(task_name: str, env: Any) -> str:
"""Generate language with RoboTwin's official task templates, matching its eval client."""
fallback = task_name.replace("_", " ")
episode_info = _robotwin_blocks_episode_info(task_name, env)
if episode_info is None:
logger.warning(
"Official RoboTwin instruction is not implemented for task=%s; using %r.", task_name, fallback
)
return fallback
try:
# Part of the robotwin simulator repo, this is being pulled by the docker image running robotwin
# see https://github.com/RoboTwin-Platform/RoboTwin/tree/main/description
# Used to generate the official instructions
from description.utils.generate_episode_instructions import generate_episode_descriptions
except Exception:
logger.warning(
"Failed to import RoboTwin official instruction generator; using %r.", fallback, exc_info=True
)
return fallback
instruction_type = os.environ.get(OFFICIAL_INSTRUCTION_TYPE_ENV, "seen")
try:
max_descriptions = int(os.environ.get(OFFICIAL_INSTRUCTION_MAX_ENV, "1000000"))
except ValueError:
max_descriptions = 1000000
results = generate_episode_descriptions(task_name, [episode_info], max_descriptions=max_descriptions)
if not results:
logger.warning(
"RoboTwin generated no official instructions for task=%s; using %r.", task_name, fallback
)
return fallback
options = results[0].get(instruction_type) or results[0].get("seen") or results[0].get("unseen")
if not options:
logger.warning(
"RoboTwin generated no %s official instructions for task=%s; using %r.",
instruction_type,
task_name,
fallback,
)
return fallback
return str(np.random.choice(options))
# D435 dims from task_config/_camera_config.yml (what demo_clean.yml selects).
DEFAULT_CAMERA_H = 240
DEFAULT_CAMERA_W = 320
@@ -234,6 +357,7 @@ class RoboTwinEnv(gym.Env):
observation_width: int | None = None,
episode_length: int = DEFAULT_EPISODE_LENGTH,
render_mode: str = "rgb_array",
action_mode: str = "joint",
):
super().__init__()
self.task_name = task_name
@@ -241,6 +365,13 @@ class RoboTwinEnv(gym.Env):
self.task_description = task_name.replace("_", " ")
self.episode_index = episode_index
self._reset_stride = n_envs
# "joint": 14-d joint-space actions via take_action(action). "ee": 16-d end-effector-pose
# deltas (added onto the episode's initial eef pose) executed via take_action(.., "ee") + IK.
if action_mode not in ("joint", "ee"):
raise ValueError(f"action_mode must be 'joint' or 'ee'; got {action_mode!r}")
self.action_mode = action_mode
self._action_dim = EEF_ACTION_DIM if action_mode == "ee" else ACTION_DIM
self._init_eef_pose: np.ndarray | None = None
self.camera_names = list(camera_names)
# Default to D435 dims (the camera type baked into task_config/demo_clean.yml).
# The YAML-driven lookup is deferred to reset() so construction doesn't
@@ -271,7 +402,7 @@ class RoboTwinEnv(gym.Env):
}
)
self.action_space = spaces.Box(
low=ACTION_LOW, high=ACTION_HIGH, shape=(ACTION_DIM,), dtype=np.float32
low=ACTION_LOW, high=ACTION_HIGH, shape=(self._action_dim,), dtype=np.float32
)
def _ensure_env(self) -> None:
@@ -317,6 +448,18 @@ class RoboTwinEnv(gym.Env):
return {"pixels": images, "agent_pos": joint_state}
def _read_eef_pose(self) -> np.ndarray:
"""Read the current 16-d dual-arm eef pose [left(xyz+quat)+grip, right(xyz+quat)+grip]."""
assert self._env is not None, "_read_eef_pose called before _ensure_env()"
ep = self._env.get_obs()["endpose"]
pose = (
list(ep["left_endpose"])
+ [ep["left_gripper"]]
+ list(ep["right_endpose"])
+ [ep["right_gripper"]]
)
return np.asarray(pose, dtype=np.float64)
def reset(self, seed: int | None = None, **kwargs) -> tuple[RobotObservation, dict]:
self._ensure_env()
super().reset(seed=seed)
@@ -330,16 +473,32 @@ class RoboTwinEnv(gym.Env):
self.episode_index += self._reset_stride
self._step_count = 0
use_official_instruction = self.task_name in {"blocks_ranking_rgb", "blocks_ranking_size"}
if _env_flag(OFFICIAL_INSTRUCTION_ENV, default=use_official_instruction):
self.task_description = _generate_robotwin_official_instruction(self.task_name, self._env)
if hasattr(self._env, "set_instruction"):
self._env.set_instruction(instruction=self.task_description)
logger.info("RoboTwin official instruction | task=%s | %s", self.task_name, self.task_description)
else:
self.task_description = self.task_name.replace("_", " ")
# In eef mode the policy predicts pose deltas relative to the initial eef pose.
if self.action_mode == "ee":
self._init_eef_pose = self._read_eef_pose()
obs = self._get_obs()
return obs, {"is_success": False, "task": self.task_name}
def step(self, action: np.ndarray) -> tuple[RobotObservation, float, bool, bool, dict[str, Any]]:
assert self._env is not None, "step() called before reset()"
if action.ndim != 1 or action.shape[0] != ACTION_DIM:
raise ValueError(f"Expected 1-D action of shape ({ACTION_DIM},), got {action.shape}")
if action.ndim != 1 or action.shape[0] != self._action_dim:
raise ValueError(f"Expected 1-D action of shape ({self._action_dim},), got {action.shape}")
with torch.enable_grad():
if hasattr(self._env, "take_action"):
if self.action_mode == "ee":
ee_action = _add_init_eef_pose(np.asarray(action, dtype=np.float64), self._init_eef_pose)
self._env.take_action(ee_action, action_type="ee")
elif hasattr(self._env, "take_action"):
self._env.take_action(action)
else:
self._env.step(action)
@@ -398,6 +557,7 @@ def _make_env_fns(
observation_height: int,
observation_width: int,
episode_length: int,
action_mode: str = "joint",
) -> list[Callable[[], RoboTwinEnv]]:
"""Return n_envs factory callables for a single task."""
@@ -410,6 +570,7 @@ def _make_env_fns(
observation_height=observation_height,
observation_width=observation_width,
episode_length=episode_length,
action_mode=action_mode,
)
return [partial(_make_one, i) for i in range(n_envs)]
@@ -423,6 +584,7 @@ def create_robotwin_envs(
observation_height: int = DEFAULT_CAMERA_H,
observation_width: int = DEFAULT_CAMERA_W,
episode_length: int = DEFAULT_EPISODE_LENGTH,
action_mode: str = "joint",
) -> dict[str, dict[int, Any]]:
"""Create vectorized RoboTwin 2.0 environments.
@@ -473,6 +635,7 @@ def create_robotwin_envs(
observation_height=observation_height,
observation_width=observation_width,
episode_length=episode_length,
action_mode=action_mode,
)
if is_async:
lazy = _LazyAsyncVectorEnv(fns, cached_obs_space, cached_act_space, cached_metadata)
+22
View File
@@ -83,6 +83,28 @@ class VQBeTSchedulerConfig(LRSchedulerConfig):
return LambdaLR(optimizer, lr_lambda, -1)
@LRSchedulerConfig.register_subclass("constant_with_warmup")
@dataclass
class ConstantWithWarmupSchedulerConfig(LRSchedulerConfig):
"""Linear warmup followed by a constant learning rate.
Mirrors the ``warmup_constant_lambda`` used by LingBot-VA (upstream ``wan_va/train.py``):
the LR ramps linearly from 0 to the peak over ``num_warmup_steps`` steps, then stays flat.
"""
num_warmup_steps: int = 1000
def build(self, optimizer: Optimizer, num_training_steps: int) -> LambdaLR:
warmup_steps = self.num_warmup_steps or 0
def lr_lambda(current_step):
if current_step < warmup_steps:
return float(current_step) / float(max(1, warmup_steps))
return 1.0
return LambdaLR(optimizer, lr_lambda, -1)
@LRSchedulerConfig.register_subclass("cosine_decay_with_warmup")
@dataclass
class CosineDecayWithWarmupSchedulerConfig(LRSchedulerConfig):
+2
View File
@@ -21,6 +21,7 @@ from .factory import get_policy_class, make_policy, make_policy_config, make_pre
from .fastwam.configuration_fastwam import FastWAMConfig as FastWAMConfig
from .gaussian_actor.configuration_gaussian_actor import GaussianActorConfig as GaussianActorConfig
from .groot.configuration_groot import GrootConfig as GrootConfig
from .lingbot_va.configuration_lingbot_va import LingBotVAConfig as LingBotVAConfig
from .molmoact2.configuration_molmoact2 import MolmoAct2Config as MolmoAct2Config
from .multi_task_dit.configuration_multi_task_dit import MultiTaskDiTConfig as MultiTaskDiTConfig
from .pi0.configuration_pi0 import PI0Config as PI0Config
@@ -46,6 +47,7 @@ __all__ = [
"FastWAMConfig",
"GaussianActorConfig",
"GrootConfig",
"LingBotVAConfig",
"MolmoAct2Config",
"MultiTaskDiTConfig",
"PI0Config",
+15
View File
@@ -50,6 +50,7 @@ from .eo1.configuration_eo1 import EO1Config
from .fastwam.configuration_fastwam import FastWAMConfig
from .gaussian_actor.configuration_gaussian_actor import GaussianActorConfig
from .groot.configuration_groot import GrootConfig
from .lingbot_va.configuration_lingbot_va import LingBotVAConfig
from .molmoact2.configuration_molmoact2 import MolmoAct2Config
from .multi_task_dit.configuration_multi_task_dit import MultiTaskDiTConfig
from .pi0.configuration_pi0 import PI0Config
@@ -163,6 +164,10 @@ def get_policy_class(name: str) -> type[PreTrainedPolicy]:
from .vla_jepa.modeling_vla_jepa import VLAJEPAPolicy
return VLAJEPAPolicy
elif name == "lingbot_va":
from .lingbot_va.modeling_lingbot_va import LingBotVAPolicy
return LingBotVAPolicy
elif name == "fastwam":
from .fastwam.modeling_fastwam import FastWAMPolicy
@@ -223,6 +228,8 @@ def make_policy_config(policy_type: str, **kwargs) -> PreTrainedConfig:
return MolmoAct2Config(**kwargs)
elif policy_type == "vla_jepa":
return VLAJEPAConfig(**kwargs)
elif policy_type == "lingbot_va":
return LingBotVAConfig(**kwargs)
elif policy_type == "fastwam":
return FastWAMConfig(**kwargs)
else:
@@ -458,6 +465,14 @@ def make_pre_post_processors(
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(policy_cfg, LingBotVAConfig):
from .lingbot_va.processor_lingbot_va import make_lingbot_va_pre_post_processors
processors = make_lingbot_va_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(policy_cfg, FastWAMConfig):
from .fastwam.processor_fastwam import make_fastwam_pre_post_processors
+1
View File
@@ -0,0 +1 @@
../../../../docs/source/lingbot_va.mdx
@@ -0,0 +1,21 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .configuration_lingbot_va import LingBotVAConfig
from .modeling_lingbot_va import LingBotVAPolicy
from .processor_lingbot_va import make_lingbot_va_pre_post_processors
__all__ = ["LingBotVAConfig", "LingBotVAPolicy", "make_lingbot_va_pre_post_processors"]
@@ -0,0 +1,168 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Configuration for the LingBot-VA policy.
LingBot-VA is an autoregressive video-action world-model policy built on the Wan2.2
video-diffusion stack. It interleaves prediction of future video latents and robot
actions in a single dual-stream transformer. See ``docs/source/lingbot_va.mdx`` and the
upstream repository (https://github.com/Robbyant/lingbot-va).
Defaults below match the upstream LIBERO configuration (``wan_va/configs/va_libero_cfg.py``)
and the ``transformer/config.json`` of the released checkpoints.
"""
from dataclasses import dataclass, field
from lerobot.configs.policies import PreTrainedConfig
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from lerobot.optim.optimizers import AdamWConfig
from lerobot.optim.schedulers import ConstantWithWarmupSchedulerConfig, LRSchedulerConfig
from lerobot.utils.constants import ACTION
@PreTrainedConfig.register_subclass("lingbot_va")
@dataclass
class LingBotVAConfig(PreTrainedConfig):
"""Configuration for the native LingBot-VA policy integration in LeRobot."""
# Wan transformer architecture
patch_size: tuple[int, int, int] = (1, 2, 2)
num_attention_heads: int = 24
attention_head_dim: int = 128
in_channels: int = 48
out_channels: int = 48
action_dim: int = 30
text_dim: int = 4096
freq_dim: int = 256
ffn_dim: int = 14336
num_layers: int = 30
cross_attn_norm: bool = True
eps: float = 1e-6
rope_max_seq_len: int = 1024
# "flex" = training only (needs recent torch); inference uses "torch" SDPA or "flashattn".
attn_mode: str = "torch"
# Frozen sub-models (VAE + UMT5 text encoder + tokenizer)
# ~20 GB of frozen weights, NOT bundled in the checkpoint; lazily pulled from this HF repo /
# local dir (must hold diffusers-style ``vae/``, ``text_encoder/``, ``tokenizer/`` sub-folders).
wan_pretrained_path: str = "robbyant/lingbot-va-base"
dtype: str = "bfloat16" # transformer / VAE / text-encoder dtype: "bfloat16", "float16", "float32"
# Frozen UMT5-XXL encoder device; "cpu" frees ~11 GB VRAM (it runs once per episode).
text_encoder_device: str = "cpu"
# Observation cameras (order matters: latents are concatenated on width; LIBERO defaults)
obs_cam_keys: list[str] = field(
default_factory=lambda: ["observation.images.image", "observation.images.image2"]
)
# Undo the LIBERO env processor's extra horizontal flip to match the model's training orientation.
image_hflip: bool = False
# Camera latent layout: "width_concat" (cameras concatenated on width; LIBERO) or
# "robotwin_tshape" (full-res head + half-res wrists in a "T"; RoboTwin).
camera_layout: str = "width_concat"
# Inference hyperparameters (LIBERO defaults)
n_obs_steps: int = 1
height: int = 128
width: int = 128
action_per_frame: int = 4
frame_chunk_size: int = 4
attn_window: int = 30
num_inference_steps: int = 20
video_exec_step: int = -1
action_num_inference_steps: int = 50
guidance_scale: float = 5.0
action_guidance_scale: float = 1.0
snr_shift: float = 5.0
action_snr_shift: float = 0.05
max_sequence_length: int = 512 # UMT5 prompt length
# Subset of the 30-d action space used by the benchmark (LIBERO = 7-DoF). The action
# (un)normalization quantiles live in the checkpoint's ``policy_postprocessor.json``, not here.
used_action_channel_ids: list[int] = field(default_factory=lambda: list(range(7)))
# Opt-in: VAE-decode predicted video latents to ``self.last_predicted_frames`` for saving MP4s.
save_predicted_video: bool = False
# Normalization: IDENTITY here; images are scaled + VAE-encoded and actions are
# quantile-(un)normalized inside the policy / dedicated processor steps.
normalization_mapping: dict[str, NormalizationMode] = field(
default_factory=lambda: {
"VISUAL": NormalizationMode.IDENTITY,
"STATE": NormalizationMode.IDENTITY,
"ACTION": NormalizationMode.IDENTITY,
}
)
# Optimizer / scheduler (training; AdamW + warmup-constant per upstream train.py)
optimizer_lr: float = 1e-5
optimizer_betas: tuple[float, float] = (0.9, 0.95)
optimizer_eps: float = 1e-8
optimizer_weight_decay: float = 1e-4
optimizer_grad_clip_norm: float = 1.0
scheduler_warmup_steps: int = 1000
def __post_init__(self):
super().__post_init__()
if self.attn_mode not in ("torch", "flashattn", "flex"):
raise ValueError(f"attn_mode must be one of 'torch', 'flashattn', 'flex'; got {self.attn_mode!r}")
@property
def chunk_size(self) -> int:
"""Number of single-step actions produced per autoregressive chunk."""
return self.frame_chunk_size * self.action_per_frame
@property
def n_action_steps(self) -> int:
"""Number of actions executed before refilling (the whole chunk)."""
return self.chunk_size
def validate_features(self) -> None:
image_features = [key for key, feat in self.input_features.items() if feat.type == FeatureType.VISUAL]
if not image_features:
raise ValueError(
"LingBot-VA requires at least one visual input feature. "
"No features of type FeatureType.VISUAL found in input_features."
)
if ACTION not in self.output_features:
self.output_features[ACTION] = PolicyFeature(
type=FeatureType.ACTION, shape=(len(self.used_action_channel_ids),)
)
def get_optimizer_preset(self) -> AdamWConfig:
return AdamWConfig(
lr=self.optimizer_lr,
betas=self.optimizer_betas,
eps=self.optimizer_eps,
weight_decay=self.optimizer_weight_decay,
grad_clip_norm=self.optimizer_grad_clip_norm,
)
def get_scheduler_preset(self) -> LRSchedulerConfig | None:
# Upstream uses a linear warmup followed by a constant LR (warmup_constant_lambda).
return ConstantWithWarmupSchedulerConfig(num_warmup_steps=self.scheduler_warmup_steps)
@property
def observation_delta_indices(self) -> list[int]:
temporal_downsample = 4
stride = max(1, self.action_per_frame // temporal_downsample)
return list(range(0, self.frame_chunk_size * temporal_downsample * stride, stride))
@property
def action_delta_indices(self) -> list[int]:
return list(range(self.chunk_size))
@property
def reward_delta_indices(self) -> None:
return None
@@ -0,0 +1,853 @@
# Copyright 2024-2025 The Robbyant Team Authors. All rights reserved.
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""LingBot-VA policy: an autoregressive video-action world model on the Wan2.2 stack.
The sampling loop is a faithful re-implementation of the upstream streaming server
(``wan_va/wan_va_server.py``) and LIBERO client (``evaluation/libero/client.py``), adapted
to LeRobot's ``select_action`` interface:
* the trainable dual-stream transformer is owned as a sub-module and round-trips in the
single ``model.safetensors`` checkpoint;
* the frozen Wan VAE + UMT5 text encoder + tokenizer are *lazily pulled* from
``config.wan_pretrained_path`` (not bundled), so the LeRobot checkpoint stays small;
* ``predict_action_chunk`` runs one autoregressive chunk (video stream then action
stream, each with CFG and its own flow-matching scheduler) and updates the KV cache;
* ``select_action`` drains a per-step action queue and records the real observed
keyframes that are fed back into the KV cache when the queue is refilled.
NOTE: The streaming path is written for single-environment eval (``--eval.batch_size=1``).
"""
from collections import deque
import torch
import torch.nn.functional as F # noqa: N812
from einops import rearrange
from torch import Tensor
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.utils.constants import ACTION
from lerobot.utils.import_utils import require_package
from .configuration_lingbot_va import LingBotVAConfig
from .utils import (
FlowMatchScheduler,
WanTransformer3DModel,
WanVAEStreamingWrapper,
_sample_timestep_id,
_torch_dtype,
clean_prompt,
data_seq_to_patch,
denormalize_latents,
get_mesh_id,
load_text_encoder,
load_tokenizer,
load_vae,
)
class LingBotVAPolicy(PreTrainedPolicy):
"""LeRobot wrapper for the LingBot-VA autoregressive video-action world model."""
config_class = LingBotVAConfig
name = "lingbot_va"
def __init__(self, config: LingBotVAConfig, **kwargs):
require_package("diffusers", extra="lingbot_va")
require_package("transformers", extra="lingbot_va")
super().__init__(config)
config.validate_features()
self.config = config
self.dtype = _torch_dtype(config.dtype)
# Trainable dual-stream transformer (the only sub-module saved in the LeRobot checkpoint).
self.transformer = WanTransformer3DModel(
patch_size=tuple(config.patch_size),
num_attention_heads=config.num_attention_heads,
attention_head_dim=config.attention_head_dim,
in_channels=config.in_channels,
out_channels=config.out_channels,
action_dim=config.action_dim,
text_dim=config.text_dim,
freq_dim=config.freq_dim,
ffn_dim=config.ffn_dim,
num_layers=config.num_layers,
cross_attn_norm=config.cross_attn_norm,
eps=config.eps,
rope_max_seq_len=config.rope_max_seq_len,
attn_mode=config.attn_mode,
)
# Run the transformer in config.dtype (bf16); norm/modulation paths upcast to fp32 internally.
self.transformer = self.transformer.to(self.dtype)
# Frozen modules are stored OUTSIDE the nn.Module registry (plain dict) so they are
# neither saved into model.safetensors nor moved by ``.to()``. They are lazily loaded
# from ``config.wan_pretrained_path`` the first time inference runs.
self._frozen: dict = {}
self.last_predicted_frames: Tensor | None = None
self.last_predicted_latents: Tensor | None = None
self.reset()
# Frozen-module lazy loading (VAE + UMT5 + tokenizer)
def _ensure_frozen_modules(self):
if self._frozen:
return
path = self.config.wan_pretrained_path
device = self.config.device
# The frozen modules always live in ``vae/``, ``text_encoder/`` and ``tokenizer/``
# sub-folders -- both in the released diffusers-style HF repos and in the local
# ``--bundle-frozen`` output dir. ``from_pretrained(path, subfolder=...)`` resolves
# them for either a HF repo id or a local directory.
vae = load_vae(path, torch_dtype=self.dtype, torch_device=device, subfolder="vae")
# The UMT5-XXL text encoder (~11 GB) runs once per episode; keep it on its own
# (CPU by default) device so the 5B transformer + VAE fit on a single GPU.
text_encoder = load_text_encoder(
path,
torch_dtype=self.dtype,
torch_device=self.config.text_encoder_device,
subfolder="text_encoder",
)
tokenizer = load_tokenizer(path, subfolder="tokenizer")
self._frozen = {
"vae": vae.eval(),
"streaming_vae": WanVAEStreamingWrapper(vae),
"text_encoder": text_encoder.eval(),
"tokenizer": tokenizer,
}
# RoboTwin's T-shape layout encodes the half-resolution wrist cameras through a second
# streaming VAE (separate causal cache) alongside the full-res head camera.
if self.config.camera_layout == "robotwin_tshape":
vae_half = load_vae(path, torch_dtype=self.dtype, torch_device=device, subfolder="vae")
self._frozen["streaming_vae_half"] = WanVAEStreamingWrapper(vae_half.eval())
@property
def _vae(self):
return self._frozen["vae"]
@property
def _streaming_vae(self):
return self._frozen["streaming_vae"]
# PreTrainedPolicy API
def get_optim_params(self) -> dict:
# Only the transformer is trainable; the VAE / text encoder stay frozen (kept outside the
# nn.Module registry). With PEFT/LoRA this naturally returns just the adapter params.
return [p for p in self.transformer.parameters() if p.requires_grad]
def reset(self):
"""Reset all per-episode streaming state (KV cache, queues, frame counter)."""
cfg = self.config
self._action_queue: deque = deque(maxlen=cfg.n_action_steps)
self._obs_buffer: list = [] # raw keyframe obs (one per env substep) observed this chunk
self._executed_actions: Tensor | None = (
None # last chunk's actions (model-normalized) for KV feedback
)
self._started = False # first select_action call uses the obs as the conditioning frame
self._exec_step = 0 # index of the action being executed within the current chunk
self._prev_j = 0 # sub-step index (within a predicted frame) of the last executed action
# Sample one keyframe every ``action_per_frame / temporal_downsample`` executed sub-steps so
# that exactly ``frame_chunk_size * temporal_downsample`` frames are VAE-encoded per chunk
# (the Wan2.2 VAE temporal downsample is 4 -> ``frame_chunk_size`` latent frames).
self._keyframe_stride = max(1, cfg.action_per_frame // 4)
self._frame_st_id = 0
self._first_chunk = True
self._prompt: str | None = None
self._prompt_embeds = None
self._negative_prompt_embeds = None
self.last_predicted_frames = None
self.last_predicted_latents = None
self._use_cfg = (cfg.guidance_scale > 1) or (cfg.action_guidance_scale > 1)
# Two independent flow-matching schedulers (video latent + action streams).
self._scheduler = FlowMatchScheduler(shift=cfg.snr_shift, sigma_min=0.0, extra_one_step=True)
self._action_scheduler = FlowMatchScheduler(
shift=cfg.action_snr_shift, sigma_min=0.0, extra_one_step=True
)
self._scheduler.set_timesteps(1000, training=True)
self._action_scheduler.set_timesteps(1000, training=True)
self._cache_initialised = False
# Clear KV cache on the (already-built) transformer, if present.
if hasattr(self, "transformer"):
self.transformer.clear_cache("pos")
# Reset the causal streaming-VAE feat cache between episodes (mirrors upstream ``_reset``).
# Without this the encoder carries over the previous episode's temporal state, corrupting the
# latent frame counts on the next episode's first encode.
if self._frozen:
self._frozen["streaming_vae"].clear_cache()
if "streaming_vae_half" in self._frozen:
self._frozen["streaming_vae_half"].clear_cache()
# Training (flow-matching dual-stream loss). Requires attn_mode="flex".
def _ensure_train_schedulers(self):
if getattr(self, "_train_sched_latent", None) is None:
cfg = self.config
self._train_sched_latent = FlowMatchScheduler(
shift=cfg.snr_shift, sigma_min=0.0, extra_one_step=True
)
self._train_sched_latent.set_timesteps(1000, training=True)
self._train_sched_action = FlowMatchScheduler(
shift=cfg.action_snr_shift, sigma_min=0.0, extra_one_step=True
)
self._train_sched_action.set_timesteps(1000, training=True)
@torch.no_grad()
def _add_noise_stream(self, latent, scheduler, action_mask, action_mode, noisy_cond_prob):
"""Flow-matching noising of one stream (port of upstream ``Trainer._add_noise``)."""
device = latent.device
b, _c, f, _h, _w = latent.shape
p = self.config.patch_size
patch_f, patch_h, patch_w = (1, 1, 1) if action_mode else (p[0], p[1], p[2])
ts_ids = _sample_timestep_id(f, num_train_timesteps=scheduler.num_train_timesteps)
noise = torch.zeros_like(latent).normal_()
timesteps = scheduler.timesteps[ts_ids].to(device)
noisy_latents = scheduler.add_noise(latent, noise, timesteps, t_dim=2)
targets = scheduler.training_target(latent, noise, timesteps)
grid_id = (
get_mesh_id(
latent.shape[-3] // patch_f,
latent.shape[-2] // patch_h,
latent.shape[-1] // patch_w,
t=1 if action_mode else 0,
f_w=1,
f_shift=0,
action=action_mode,
)
.to(device)[None]
.repeat(b, 1, 1)
)
if torch.rand(1).item() < noisy_cond_prob:
cond_ids = _sample_timestep_id(
f, min_timestep_bd=0.5, max_timestep_bd=1.0, num_train_timesteps=scheduler.num_train_timesteps
)
cond_noise = torch.zeros_like(latent).normal_()
cond_timesteps = scheduler.timesteps[cond_ids].to(device)
latent = scheduler.add_noise(latent, cond_noise, cond_timesteps, t_dim=2)
else:
cond_timesteps = torch.zeros_like(timesteps)
if action_mask is not None:
noisy_latents = noisy_latents * action_mask.float()
targets = targets * action_mask.float()
latent = latent * action_mask.float()
return {
"timesteps": timesteps[None].repeat(b, 1),
"noisy_latents": noisy_latents,
"targets": targets,
"latent": latent,
"cond_timesteps": cond_timesteps[None].repeat(b, 1),
"grid_id": grid_id,
}
def _flow_matching_loss(self, input_dict, pred):
"""Dual-stream flow-matching loss (port of upstream ``Trainer.compute_loss``)."""
latent_pred, action_pred = pred
ld, ad = input_dict["latent_dict"], input_dict["action_dict"]
action_pred = rearrange(action_pred, "b (f n) c -> b c f n 1", f=ad["targets"].shape[-3])
latent_pred = data_seq_to_patch(
self.config.patch_size,
latent_pred,
ld["targets"].shape[-3],
ld["targets"].shape[-2],
ld["targets"].shape[-1],
batch_size=latent_pred.shape[0],
)
bn, fn = ld["timesteps"].shape
lw = self._train_sched_latent.training_weight(ld["timesteps"].flatten()).reshape(bn, fn)
aw = self._train_sched_action.training_weight(ad["timesteps"].flatten()).reshape(bn, fn)
latent_loss = F.mse_loss(latent_pred.float(), ld["targets"].float().detach(), reduction="none")
latent_loss = (
(latent_loss * lw[:, None, :, None, None]).permute(0, 2, 3, 4, 1).flatten(0, 1).flatten(1)
)
latent_loss = (latent_loss.sum(dim=1) / (torch.ones_like(latent_loss).sum(dim=1) + 1e-6)).mean()
amask = ad["actions_mask"].float()
action_loss = F.mse_loss(action_pred.float(), ad["targets"].float().detach(), reduction="none")
action_loss = (
(action_loss * aw[:, None, :, None, None] * amask).permute(0, 2, 3, 4, 1).flatten(0, 1).flatten(1)
)
amask_f = amask.permute(0, 2, 3, 4, 1).flatten(0, 1).flatten(1)
action_loss = (action_loss.sum(dim=1) / (amask_f.sum(dim=1) + 1e-6)).mean()
return latent_loss, action_loss
def training_loss_from_streams(self, latents, actions, actions_mask, text_emb):
"""Core dual-stream training loss given prepared latents / actions / text embeddings.
``latents``: ``[B, in_channels, F, h, w]`` (normalized video latents).
``actions`` / ``actions_mask``: ``[B, action_dim, F, action_per_frame, 1]``.
``text_emb``: ``[B, seq_len, text_dim]``. Returns ``(loss, {latent_loss, action_loss})``.
"""
if self.config.attn_mode != "flex":
raise ValueError(
"LingBot-VA training requires attn_mode='flex' (block-causal flow-matching masks). "
"Load/convert the policy with --policy.attn_mode=flex for training/fine-tuning."
)
self._ensure_train_schedulers()
latent_dict = self._add_noise_stream(
latents, self._train_sched_latent, action_mask=None, action_mode=False, noisy_cond_prob=0.5
)
action_dict = self._add_noise_stream(
actions, self._train_sched_action, action_mask=actions_mask, action_mode=True, noisy_cond_prob=0.0
)
latent_dict["text_emb"] = text_emb
action_dict["text_emb"] = text_emb
action_dict["actions_mask"] = actions_mask
input_dict = {
"latent_dict": latent_dict,
"action_dict": action_dict,
"chunk_size": int(torch.randint(1, 5, (1,)).item()),
"window_size": int(torch.randint(4, 65, (1,)).item()),
}
pred = self.transformer(input_dict, train_mode=True)
latent_loss, action_loss = self._flow_matching_loss(input_dict, pred)
loss = latent_loss + action_loss
return loss, {"latent_loss": latent_loss.item(), "action_loss": action_loss.item()}
def forward(self, batch: dict[str, Tensor]) -> tuple[Tensor, dict | None]:
"""Training forward: dual-stream flow-matching loss.
Builds the (video-latent, action, text) training streams from a LeRobot batch
(VAE-encoding the camera frames and UMT5-encoding the task), then runs the flow-matching
dual-stream loss. Requires the policy to be built with ``attn_mode='flex'``.
"""
self._ensure_frozen_modules()
latents, actions, actions_mask, text_emb = self._build_training_streams(batch)
return self.training_loss_from_streams(latents, actions, actions_mask, text_emb)
@torch.no_grad()
def _build_training_streams(self, batch):
"""Build (latents, actions, actions_mask, text_emb) from a LeRobot training batch.
Camera frames per ``obs_cam_keys`` are expected as a temporal clip ``[B, C, T, H, W]`` (or
``[B, T, C, H, W]``); they are VAE-encoded into ``F = T / temporal_downsample`` latent frames.
Actions ``[B, F*action_per_frame, n_used]`` are scattered into the model's ``action_dim`` space.
"""
cfg = self.config
device = cfg.device
# text embeddings
task = batch.get("task")
if isinstance(task, str):
task = [task]
text_emb = self._get_t5_prompt_embeds(list(task), cfg.max_sequence_length)
# video latents (VAE-encode the camera clips)
latents = self._encode_training_latents(batch)
# actions -> [B, action_dim, F, action_per_frame, 1]
act = batch[ACTION].to(device) # [B, F*apf, n_used]
b = act.shape[0]
used = cfg.used_action_channel_ids
apf, fc = cfg.action_per_frame, cfg.frame_chunk_size
act = act[:, : fc * apf].reshape(b, fc, apf, len(used)).permute(0, 3, 1, 2) # [B, n_used, F, apf]
full = act.new_zeros(b, cfg.action_dim, fc, apf)
idx = torch.as_tensor(used, device=device)
full[:, idx] = act
actions = full.unsqueeze(-1).to(self.dtype) # [B, action_dim, F, apf, 1]
mask = torch.zeros(cfg.action_dim, device=device, dtype=self.dtype)
mask[idx] = 1.0
actions_mask = mask.view(1, -1, 1, 1, 1).expand_as(actions)
return latents, actions, actions_mask, text_emb
@torch.no_grad()
def _encode_training_latents(self, batch) -> Tensor:
"""VAE-encode the per-camera training clips into normalized video latents [B, C, F, h, w]."""
vae_device = next(self._vae.parameters()).device
def _clip(key):
x = batch[key].to(vae_device)
if x.dim() == 4: # [B, C, H, W] -> single frame clip
x = x.unsqueeze(2)
elif x.shape[1] not in (1, 3) and x.shape[2] in (1, 3): # [B, T, C, H, W] -> [B, C, T, H, W]
x = x.permute(0, 2, 1, 3, 4)
return x.contiguous()
def _encode(x, size):
b, c, t = x.shape[:3]
x = F.interpolate(x.flatten(0, 1).float(), size=size, mode="bilinear", align_corners=False)
x = (x.view(b, c, t, *size) * 2.0 - 1.0).to(self.dtype)
mu = self._vae.encode(x).latent_dist.mode() # [B, z_dim, F, h, w]
mean = torch.tensor(self._vae.config.latents_mean).view(1, -1, 1, 1, 1).to(mu.device)
inv_std = (1.0 / torch.tensor(self._vae.config.latents_std)).view(1, -1, 1, 1, 1).to(mu.device)
return ((mu.float() - mean) * inv_std).to(mu)
keys = self.config.obs_cam_keys
if self.config.camera_layout == "robotwin_tshape":
h, w = self.config.height, self.config.width
head = _encode(_clip(keys[0]), (h, w))
left = _encode(_clip(keys[1]), (h // 2, w // 2))
right = _encode(_clip(keys[2]), (h // 2, w // 2))
return torch.cat([torch.cat([left, right], dim=-1), head], dim=-2).to(self.config.device)
per_cam = [_encode(_clip(k), (self.config.height, self.config.width)) for k in keys]
return torch.cat(per_cam, dim=-1).to(self.config.device)
@torch.no_grad()
def select_action(self, batch: dict[str, Tensor], **kwargs) -> Tensor:
"""Return one action, refilling the chunk (and feeding back observed keyframes) as needed.
Mirrors the upstream LIBERO client loop (``evaluation/libero/client.py``): the first obs is
the conditioning frame; every observation produced afterwards is buffered as a keyframe and,
once the chunk's actions are exhausted, the buffered frames + executed actions are fed back
into the KV cache before the next chunk is predicted.
"""
self.eval()
self._ensure_frozen_modules()
self._maybe_init_prompt(batch)
if not self._started:
# First call: this observation conditions the first chunk (it is *not* a keyframe).
self._started = True
actions = self.predict_action_chunk(batch) # [B, chunk_size, n_used]
self._action_queue.extend(actions.transpose(0, 1)) # [chunk_size, B, n_used]
self._obs_buffer = []
self._exec_step = 0
else:
# This observation is the result of the previously executed action -> a candidate
# keyframe. Buffer it on the sub-step boundary the upstream client samples on.
if (self._prev_j + 1) % self._keyframe_stride == 0:
self._obs_buffer.append(self._extract_raw_obs(batch))
if len(self._action_queue) == 0:
# All actions for the current chunk have been executed; feed the observed
# keyframes + executed actions back and predict the next chunk.
actions = self.predict_action_chunk(None)
self._action_queue.extend(actions.transpose(0, 1))
self._exec_step = 0
self._prev_j = self._exec_step % self.config.action_per_frame
self._exec_step += 1
return self._action_queue.popleft()
@torch.no_grad()
def predict_action_chunk(self, batch: dict[str, Tensor], **kwargs) -> Tensor:
"""Run one autoregressive chunk and return actions ``[B, chunk_size, n_used]`` (normalized)."""
self.eval()
self._ensure_frozen_modules()
self._maybe_init_prompt(batch)
is_first = self._first_chunk
if is_first:
init_latent = self._encode_frames([self._extract_raw_obs(batch)])
self._init_latent = init_latent
self._init_streaming_cache(init_latent)
self._obs_buffer = [] # frame 0 (the init obs) conditions the chunk; it is not fed back
actions, latents = self._infer(init_latent, frame_st_id=0)
self._first_chunk = False
else:
# Feed the real observed keyframes + the executed actions back into the KV cache.
self._compute_kv_cache(self._obs_buffer, self._executed_actions)
self._obs_buffer = []
actions, latents = self._infer(None, frame_st_id=self._frame_st_id)
# actions: [B, action_dim, F, action_per_frame, 1] (model-normalized). Keep for KV feedback.
self._executed_actions = actions
if self.config.save_predicted_video:
# Match upstream LingBot-VA visualization: collect chunk latents and decode the
# concatenated latent sequence once after the rollout finishes.
self.last_predicted_frames = None
self.last_predicted_latents = latents.detach().to("cpu")
# On the first chunk, frame 0 is the conditioning frame (already "known"): the upstream
# LIBERO client skips it (start_idx=1), so we drop the first frame's actions here.
used = self.config.used_action_channel_ids
a = actions[:, used] # [B, n_used, F, action_per_frame, 1]
if is_first:
a = a[:, :, 1:] # drop frame 0 -> (F-1) frames of actions
a = a.squeeze(-1).flatten(2) # [B, n_used, n_steps]
a = a.transpose(1, 2).contiguous() # [B, n_steps, n_used]
return a.to(torch.float32)
# Prompt / text encoding
def _maybe_init_prompt(self, batch):
if self._prompt_embeds is not None or batch is None:
return
task = batch.get("task")
prompt = task[0] if isinstance(task, list | tuple) else task
self._prompt = prompt or ""
self._prompt_embeds, self._negative_prompt_embeds = self._encode_prompt(self._prompt)
def _get_t5_prompt_embeds(self, prompt, max_sequence_length):
tokenizer = self._frozen["tokenizer"]
text_encoder = self._frozen["text_encoder"]
device = self.config.device
prompt = [prompt] if isinstance(prompt, str) else prompt
prompt = [clean_prompt(u) for u in prompt]
text_inputs = tokenizer(
prompt,
padding="max_length",
max_length=max_sequence_length,
truncation=True,
add_special_tokens=True,
return_attention_mask=True,
return_tensors="pt",
)
text_input_ids, mask = text_inputs.input_ids, text_inputs.attention_mask
seq_lens = mask.gt(0).sum(dim=1).long()
te_device = next(text_encoder.parameters()).device
prompt_embeds = text_encoder(text_input_ids.to(te_device), mask.to(te_device)).last_hidden_state
prompt_embeds = prompt_embeds.to(dtype=self.dtype, device=device)
prompt_embeds = [u[:v] for u, v in zip(prompt_embeds, seq_lens, strict=False)]
prompt_embeds = torch.stack(
[torch.cat([u, u.new_zeros(max_sequence_length - u.size(0), u.size(1))]) for u in prompt_embeds],
dim=0,
)
return prompt_embeds.to(device)
def _encode_prompt(self, prompt):
max_len = self.config.max_sequence_length
prompt_embeds = self._get_t5_prompt_embeds(prompt, max_len)
negative_prompt_embeds = None
if self._use_cfg:
negative_prompt_embeds = self._get_t5_prompt_embeds("", max_len)
return prompt_embeds, negative_prompt_embeds
# Observation (image) encoding -> normalized video latents
def _extract_raw_obs(self, batch) -> dict[str, Tensor]:
"""Snapshot the configured camera images from a batch (kept raw for later VAE encoding)."""
return {k: batch[k].detach() for k in self.config.obs_cam_keys}
def _camera_frame(self, raw_obs, key, size=None) -> Tensor:
"""Return a single-frame camera tensor [1, C, 1, H, W] resized + scaled to [-1, 1]."""
img = raw_obs[key]
if img.dim() == 3: # [C, H, W]
img = img.unsqueeze(0)
# LeRobot images arrive as float in [0, 1], shape [B, C, H, W].
img = img.to(self.config.device, torch.float32)
if self.config.image_hflip:
img = torch.flip(img, dims=[-1]) # undo the env processor's horizontal flip
if size is None:
size = (self.config.height, self.config.width)
img = F.interpolate(img, size=size, mode="bilinear", align_corners=False)
img = img * 2.0 - 1.0
return img.unsqueeze(2).to(self.dtype) # [1, C, F=1, H, W]
def _normalize_vae_latent(self, enc_out: Tensor) -> Tensor:
"""Take the mean of a VAE encoder output and channel-normalize it (matches upstream)."""
mu, _logvar = torch.chunk(enc_out, 2, dim=1)
latents_mean = torch.tensor(self._vae.config.latents_mean).to(mu.device)
latents_std = torch.tensor(self._vae.config.latents_std).to(mu.device)
mean = latents_mean.view(1, -1, 1, 1, 1)
inv_std = (1.0 / latents_std).view(1, -1, 1, 1, 1)
return ((mu.float() - mean) * inv_std).to(mu)
@torch.no_grad()
def _encode_frames(self, raw_frames: list) -> Tensor:
"""VAE-encode a temporal clip of observed frames and concat the per-camera latents on width.
``raw_frames`` is a list of per-frame obs dicts (one per env sub-step). Each configured
camera is stacked along the temporal axis into a ``[1, C, F, H, W]`` clip and encoded in a
single streaming ``encode_chunk`` call so the VAE temporal downsample (x4) collapses the F
input frames into ``F / 4`` latent frames, with the causal ``feat_cache`` carried across
chunks (mirrors upstream ``_encode_obs``).
"""
vae_device = next(self._vae.parameters()).device
if self.config.camera_layout == "robotwin_tshape":
return self._encode_frames_tshape(raw_frames, vae_device)
per_cam_videos = []
for k in self.config.obs_cam_keys:
frames = [self._camera_frame(fb, k) for fb in raw_frames]
per_cam_videos.append(torch.cat(frames, dim=2)) # [1, C, F, H, W]
videos = torch.cat(per_cam_videos, dim=0) # [num_cam, C, F, H, W]
enc_out = self._streaming_vae.encode_chunk(videos.to(vae_device).to(self.dtype))
mu_norm = self._normalize_vae_latent(enc_out)
# Concatenate the per-camera latents along width.
video_latent = torch.cat(mu_norm.split(1, dim=0), dim=-1)
return video_latent.to(self.config.device)
@torch.no_grad()
def _encode_frames_tshape(self, raw_frames: list, vae_device) -> Tensor:
"""RoboTwin T-shape latent assembly: full-res head + half-res wrists (second streaming VAE).
The two wrist latents are concatenated on width and stacked (on the height axis) on top of
the head latent, mirroring upstream ``_encode_obs`` for ``env_type='robotwin_tshape'``.
"""
cfg = self.config
h, w = cfg.height, cfg.width
head_key, left_key, right_key = cfg.obs_cam_keys[0], cfg.obs_cam_keys[1], cfg.obs_cam_keys[2]
head = torch.cat([self._camera_frame(fb, head_key, size=(h, w)) for fb in raw_frames], dim=2)
left = torch.cat(
[self._camera_frame(fb, left_key, size=(h // 2, w // 2)) for fb in raw_frames], dim=2
)
right = torch.cat(
[self._camera_frame(fb, right_key, size=(h // 2, w // 2)) for fb in raw_frames], dim=2
)
wrists = torch.cat([left, right], dim=0) # [2, C, F, H/2, W/2]
enc_high = self._streaming_vae.encode_chunk(head.to(vae_device).to(self.dtype))
enc_lr = self._frozen["streaming_vae_half"].encode_chunk(wrists.to(vae_device).to(self.dtype))
# wrists side-by-side on width, then stacked on top of the head latent on the height axis.
enc_out = torch.cat([torch.cat(enc_lr.split(1, dim=0), dim=-1), enc_high], dim=-2)
video_latent = self._normalize_vae_latent(enc_out)
return video_latent.to(self.config.device)
# KV cache management
@property
def _latent_hw(self):
if self.config.camera_layout == "robotwin_tshape":
# head (full) on the bottom, two half-res wrists side-by-side on top -> 1.5x height.
return ((self.config.height // 16) * 3) // 2, self.config.width // 16
h = self.config.height // 16
w = (self.config.width // 16) * len(self.config.obs_cam_keys)
return h, w
def _init_streaming_cache(self, init_latent):
cfg = self.config
latent_h, latent_w = self._latent_hw
p = cfg.patch_size
latent_token_per_chunk = (cfg.frame_chunk_size * latent_h * latent_w) // (p[0] * p[1] * p[2])
action_token_per_chunk = cfg.frame_chunk_size * cfg.action_per_frame
self.transformer.create_empty_cache(
"pos",
cfg.attn_window,
latent_token_per_chunk,
action_token_per_chunk,
device=self.config.device,
dtype=self.dtype,
batch_size=2 if self._use_cfg else 1,
)
self._cache_initialised = True
def _repeat_input_for_cfg(self, input_dict):
if self._use_cfg:
input_dict["noisy_latents"] = input_dict["noisy_latents"].repeat(2, 1, 1, 1, 1)
input_dict["text_emb"] = torch.cat(
[
self._prompt_embeds.to(self.dtype).clone(),
self._negative_prompt_embeds.to(self.dtype).clone(),
],
dim=0,
)
input_dict["grid_id"] = input_dict["grid_id"][None].repeat(2, 1, 1)
input_dict["timesteps"] = input_dict["timesteps"][None].repeat(2, 1)
else:
input_dict["grid_id"] = input_dict["grid_id"][None]
input_dict["timesteps"] = input_dict["timesteps"][None]
return input_dict
def _prepare_latent_input(
self,
latent_model_input,
action_model_input,
latent_t=0,
action_t=0,
latent_cond=None,
action_cond=None,
frame_st_id=0,
):
cfg = self.config
device = self.config.device
p = cfg.patch_size
out = {}
if latent_model_input is not None:
out["latent_res_lst"] = {
"noisy_latents": latent_model_input,
"timesteps": torch.ones([latent_model_input.shape[2]], dtype=torch.float32, device=device)
* latent_t,
"grid_id": get_mesh_id(
latent_model_input.shape[-3] // p[0],
latent_model_input.shape[-2] // p[1],
latent_model_input.shape[-1] // p[2],
0,
1,
frame_st_id,
).to(device),
"text_emb": self._prompt_embeds.to(self.dtype).clone(),
}
if latent_cond is not None:
out["latent_res_lst"]["noisy_latents"][:, :, 0:1] = latent_cond[:, :, 0:1]
out["latent_res_lst"]["timesteps"][0:1] *= 0
if action_model_input is not None:
out["action_res_lst"] = {
"noisy_latents": action_model_input,
"timesteps": torch.ones([action_model_input.shape[2]], dtype=torch.float32, device=device)
* action_t,
"grid_id": get_mesh_id(
action_model_input.shape[-3],
action_model_input.shape[-2],
action_model_input.shape[-1],
1,
1,
frame_st_id,
action=True,
).to(device),
"text_emb": self._prompt_embeds.to(self.dtype).clone(),
}
if action_cond is not None:
out["action_res_lst"]["noisy_latents"][:, :, 0:1] = action_cond[:, :, 0:1]
out["action_res_lst"]["timesteps"][0:1] *= 0
out["action_res_lst"]["noisy_latents"][:, ~self._action_mask] *= 0
return out
@property
def _action_mask(self):
mask = torch.zeros([self.config.action_dim], dtype=torch.bool)
mask[self.config.used_action_channel_ids] = True
return mask
# Action conditioning (executed action history) (de)normalization
def _preprocess_action_state(self, action_norm: Tensor) -> Tensor:
"""Build the action-conditioning tensor from the already-normalized executed actions.
``action_norm`` is the model-space action chunk ``[B, action_dim, F, action_per_frame, 1]``.
Upstream re-derives the conditioning from the raw executed action via quantile norm; here
the executed actions are already in the model-normalized space, so we pass them through.
"""
return action_norm.to(self.config.device, self.dtype)
def _compute_kv_cache(self, obs_buffer, executed_actions):
"""Feed real observed keyframes + executed actions back into the KV cache."""
if not obs_buffer or executed_actions is None:
return
self.transformer.clear_pred_cache("pos")
# Encode the buffered keyframe clip in one streaming call (carries the causal VAE cache).
latent_model_input = self._encode_frames(obs_buffer)
# On the first feedback, prepend the init latent so the latent/action frame counts align
# (upstream prepends ``init_latent`` to the observed keyframes when frame_st_id == 0).
if self._frame_st_id == 0 and getattr(self, "_init_latent", None) is not None:
latent_model_input = torch.cat([self._init_latent, latent_model_input], dim=2)
action_model_input = self._preprocess_action_state(executed_actions)
action_model_input = action_model_input.to(latent_model_input)
input_dict = self._prepare_latent_input(
latent_model_input, action_model_input, frame_st_id=self._frame_st_id
)
with torch.no_grad():
self.transformer(
self._repeat_input_for_cfg(input_dict["latent_res_lst"]),
update_cache=2,
cache_name="pos",
action_mode=False,
)
self.transformer(
self._repeat_input_for_cfg(input_dict["action_res_lst"]),
update_cache=2,
cache_name="pos",
action_mode=True,
)
self._frame_st_id += latent_model_input.shape[2]
# The core dual-stream denoising loop (one chunk)
@torch.no_grad()
def _infer(self, init_latent, frame_st_id=0):
cfg = self.config
device = self.config.device
latent_h, latent_w = self._latent_hw
frame_chunk_size = cfg.frame_chunk_size
latents = torch.randn(1, 48, frame_chunk_size, latent_h, latent_w, device=device, dtype=self.dtype)
actions = torch.randn(
1, cfg.action_dim, frame_chunk_size, cfg.action_per_frame, 1, device=device, dtype=self.dtype
)
self._scheduler.set_timesteps(cfg.num_inference_steps)
self._action_scheduler.set_timesteps(cfg.action_num_inference_steps)
timesteps = F.pad(self._scheduler.timesteps, (0, 1), mode="constant", value=0)
if cfg.video_exec_step != -1:
timesteps = timesteps[: cfg.video_exec_step]
action_timesteps = F.pad(self._action_scheduler.timesteps, (0, 1), mode="constant", value=0)
# 1. Video-latent denoising loop
for i, t in enumerate(timesteps):
last_step = i == len(timesteps) - 1
latent_cond = (
init_latent[:, :, 0:1].to(self.dtype)
if frame_st_id == 0 and init_latent is not None
else None
)
input_dict = self._prepare_latent_input(
latents, None, t, t, latent_cond, None, frame_st_id=frame_st_id
)
video_noise_pred = self.transformer(
self._repeat_input_for_cfg(input_dict["latent_res_lst"]),
update_cache=1 if last_step else 0,
cache_name="pos",
action_mode=False,
)
if not last_step or cfg.video_exec_step != -1:
video_noise_pred = data_seq_to_patch(
cfg.patch_size,
video_noise_pred,
frame_chunk_size,
latent_h,
latent_w,
batch_size=2 if self._use_cfg else 1,
)
if cfg.guidance_scale > 1:
video_noise_pred = video_noise_pred[1:] + cfg.guidance_scale * (
video_noise_pred[:1] - video_noise_pred[1:]
)
else:
video_noise_pred = video_noise_pred[:1]
latents = self._scheduler.step(video_noise_pred, t, latents, return_dict=False)
if frame_st_id == 0 and latent_cond is not None:
latents[:, :, 0:1] = latent_cond
# 2. Action denoising loop
for i, t in enumerate(action_timesteps):
last_step = i == len(action_timesteps) - 1
action_cond = (
torch.zeros([1, cfg.action_dim, 1, cfg.action_per_frame, 1], device=device, dtype=self.dtype)
if frame_st_id == 0
else None
)
input_dict = self._prepare_latent_input(
None, actions, t, t, None, action_cond, frame_st_id=frame_st_id
)
action_noise_pred = self.transformer(
self._repeat_input_for_cfg(input_dict["action_res_lst"]),
update_cache=1 if last_step else 0,
cache_name="pos",
action_mode=True,
)
if not last_step:
action_noise_pred = rearrange(action_noise_pred, "b (f n) c -> b c f n 1", f=frame_chunk_size)
if cfg.action_guidance_scale > 1:
action_noise_pred = action_noise_pred[1:] + cfg.action_guidance_scale * (
action_noise_pred[:1] - action_noise_pred[1:]
)
else:
action_noise_pred = action_noise_pred[:1]
actions = self._action_scheduler.step(action_noise_pred, t, actions, return_dict=False)
if frame_st_id == 0 and action_cond is not None:
actions[:, :, 0:1] = action_cond
actions[:, ~self._action_mask] *= 0
return actions, latents
# Predicted-video decoding (opt-in)
@torch.no_grad()
def decode_predicted_latents(self, latents) -> Tensor:
"""Decode a concatenated predicted-latent sequence into ``[T, H, W, 3]`` uint8 frames."""
return self._decode_predicted_video(latents)
@torch.no_grad()
def _decode_predicted_video(self, latents) -> Tensor:
"""VAE-decode predicted latents into a uint8 frame stack ``[T, H, W, 3]`` on CPU."""
vae = self._vae
z_dim = vae.config.z_dim
vae_device = next(vae.parameters()).device
latents = latents.to(device=vae_device, dtype=vae.dtype)
latents = denormalize_latents(latents, vae.config.latents_mean, vae.config.latents_std, z_dim)
video = vae.decode(latents, return_dict=False)[0] # [B, C, F, H, W] in [-1, 1]
video = (video.float().clamp(-1, 1) + 1.0) / 2.0
video = (video[0].permute(1, 2, 3, 0) * 255.0).round().to(torch.uint8) # [F, H, W, C]
return video.cpu()
@@ -0,0 +1,87 @@
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pre/post-processor pipelines for the LingBot-VA policy.
The preprocessor passes inputs through (IDENTITY) and the postprocessor maps the policy's
``[-1, 1]`` actions back to physical units with the built-in ``UnnormalizerProcessorStep``
(QUANTILES) using per-channel q01/q99 restored from the checkpoint.
"""
from typing import Any
import torch
from lerobot.configs.types import FeatureType, NormalizationMode
from lerobot.processor import (
AddBatchDimensionProcessorStep,
DeviceProcessorStep,
NormalizerProcessorStep,
PolicyAction,
PolicyProcessorPipeline,
ProcessorStep,
RenameObservationsProcessorStep,
UnnormalizerProcessorStep,
)
from lerobot.processor.converters import policy_action_to_transition, transition_to_policy_action
from lerobot.utils.constants import (
POLICY_POSTPROCESSOR_DEFAULT_NAME,
POLICY_PREPROCESSOR_DEFAULT_NAME,
)
from .configuration_lingbot_va import LingBotVAConfig
def make_lingbot_va_pre_post_processors(
config: LingBotVAConfig,
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
) -> tuple[
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
PolicyProcessorPipeline[PolicyAction, PolicyAction],
]:
"""Build the pre/post processor pipelines for LingBot-VA."""
input_steps: list[ProcessorStep] = [
RenameObservationsProcessorStep(rename_map={}),
AddBatchDimensionProcessorStep(),
NormalizerProcessorStep(
features={**config.input_features, **config.output_features},
norm_map=config.normalization_mapping,
stats=dataset_stats,
),
DeviceProcessorStep(device=config.device),
]
# Unnormalize actions from [-1, 1] to physical units (QUANTILES) using q01/q99 restored from the checkpoint.
output_steps: list[ProcessorStep] = [
UnnormalizerProcessorStep(
features=config.output_features,
norm_map={FeatureType.ACTION: NormalizationMode.QUANTILES},
stats=dataset_stats,
),
DeviceProcessorStep(device="cpu"),
]
return (
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]](
steps=input_steps,
name=POLICY_PREPROCESSOR_DEFAULT_NAME,
),
PolicyProcessorPipeline[PolicyAction, PolicyAction](
steps=output_steps,
name=POLICY_POSTPROCESSOR_DEFAULT_NAME,
to_transition=policy_action_to_transition,
to_output=transition_to_policy_action,
),
)
File diff suppressed because it is too large Load Diff
+6
View File
@@ -106,6 +106,8 @@ class DAggerKeyboardConfig:
pause_resume: str = "space"
correction: str = "tab"
upload: str = "enter"
success: str = "s"
failure: str = "f"
@dataclass
@@ -165,6 +167,10 @@ class DAggerStrategyConfig(RolloutStrategyConfig):
2. **correction** toggle human correction recording.
3. **upload** push dataset to hub on demand (corrections-only mode).
Episode success labeling:
4. **success** mark current episode as successful.
5. **failure** mark current episode as failed.
When ``record_autonomous=False`` (default) only human-correction windows
are recorded each correction becomes its own episode. Set to ``True``
to record both autonomous and correction frames with size-based episode
+5
View File
@@ -350,6 +350,11 @@ def build_rollout_context(
"shape": (1,),
"names": None,
}
dataset_features["next.success"] = {
"dtype": "bool",
"shape": (1,),
"names": None,
}
repo_name = cfg.dataset.repo_id.split("/", 1)[-1]
if not repo_name.startswith("rollout_"):
+116 -16
View File
@@ -112,6 +112,11 @@ class DAggerEvents:
# Session-level flags
self.stop_recording = Event()
self.upload_requested = Event()
# Set when operator presses success/failure key to end the current episode.
self.save_episode_requested = Event()
# Episode success labeling
self._episode_success: bool | None = None
# -- Thread-safe phase access ------------------------------------------
@@ -155,7 +160,26 @@ class DAggerEvents:
with self._lock:
self._phase = DAggerPhase.AUTONOMOUS
self._pending_transition = None
self._episode_success = None
self.upload_requested.clear()
self.save_episode_requested.clear()
def mark_success(self) -> None:
"""Mark the current episode as successful (called from input threads)."""
with self._lock:
self._episode_success = True
def mark_failure(self) -> None:
"""Mark the current episode as failed (called from input threads)."""
with self._lock:
self._episode_success = False
def consume_episode_success(self) -> bool | None:
"""Consume and reset the episode success label. Returns None if unlabeled."""
with self._lock:
result = self._episode_success
self._episode_success = None
return result
# ---------------------------------------------------------------------------
@@ -186,12 +210,20 @@ def _init_dagger_keyboard(events: DAggerEvents, cfg: DAggerKeyboardConfig):
events.request_transition(key_to_event[name])
if name == cfg.upload:
events.upload_requested.set()
if name == cfg.success:
events.mark_success()
events.save_episode_requested.set()
logger.info("Episode marked as SUCCESS — saving")
if name == cfg.failure:
events.mark_failure()
events.save_episode_requested.set()
logger.info("Episode marked as FAILURE — saving")
return create_key_listener(
dispatch,
controls_help=(
f"pause_resume='{cfg.pause_resume}', correction='{cfg.correction}', "
f"upload='{cfg.upload}', ESC=stop"
f"upload='{cfg.upload}', success='{cfg.success}', failure='{cfg.failure}', ESC=stop"
),
)
@@ -313,6 +345,32 @@ class DAggerStrategy(RolloutStrategy):
)
logger.info("DAgger strategy teardown complete")
# ------------------------------------------------------------------
# Episode success labeling
# ------------------------------------------------------------------
def _stamp_episode_success(self, dataset) -> None:
"""Set next.success on the terminal frame based on operator label.
Called just before save_episode(). If the operator pressed the success
key during this episode, the last frame's next.success is set to True.
Otherwise all frames remain False (unlabeled = assumed failure).
"""
buf = dataset.writer.episode_buffer
if buf is None:
return
success_buf = buf.get("next.success")
if not success_buf:
return
label = self._events.consume_episode_success()
logger.info("_stamp_episode_success: label=%s, buffer_len=%d", label, len(success_buf))
if label:
success_buf[-1] = np.array([True], dtype=bool)
logger.info("Terminal frame stamped next.success=True")
# ------------------------------------------------------------------
# Continuous recording mode (record_autonomous=True)
# ------------------------------------------------------------------
@@ -350,7 +408,12 @@ class DAggerStrategy(RolloutStrategy):
episode_start = time.perf_counter()
episodes_since_push = 0
episode_duration_s = self._episode_duration_s
logger.info("DAgger continuous recording started (episode_duration=%.0fs)", episode_duration_s)
num_episodes = self.config.num_episodes
logger.info(
"DAgger continuous recording started (episode_duration=%.0fs, target=%s eps)",
episode_duration_s,
num_episodes if num_episodes is not None else "",
)
with VideoEncodingManager(dataset):
try:
@@ -399,6 +462,7 @@ class DAggerStrategy(RolloutStrategy):
**action_frame,
"task": task_str,
"intervention": np.array([True], dtype=bool),
"next.success": np.array([False], dtype=bool),
}
dataset.add_frame(frame)
record_tick += 1
@@ -427,23 +491,32 @@ class DAggerStrategy(RolloutStrategy):
**action_frame,
"task": task_str,
"intervention": np.array([False], dtype=bool),
"next.success": np.array([False], dtype=bool),
}
dataset.add_frame(frame)
record_tick += 1
# Episode rotation derived from the video file-size target.
# Saving is deferred while a correction is ongoing so the
# episode boundary lands on a clean autonomous frame.
# Episode rotation: either the operator pressed success/failure,
# or the video file-size target was reached.
# Defer the save while a correction is ongoing so the episode
# boundary lands on a clean autonomous frame. The event stays
# set until we actually save, so it won't be lost.
manual_save = events.save_episode_requested.is_set()
elapsed = time.perf_counter() - episode_start
if elapsed >= episode_duration_s and phase != DAggerPhase.CORRECTING:
if (manual_save or elapsed >= episode_duration_s) and phase != DAggerPhase.CORRECTING:
if manual_save:
events.save_episode_requested.clear()
with self._episode_lock:
self._stamp_episode_success(dataset)
dataset.save_episode()
episodes_since_push += 1
self._needs_push.set()
save_reason = "manual save" if manual_save else f"elapsed {elapsed:.1f}s"
logger.info(
"Episode saved (total: %d, elapsed: %.1fs)",
"Episode saved (%s, total: %d)",
save_reason,
dataset.num_episodes,
elapsed,
)
log_say(f"Episode {dataset.num_episodes} saved", play_sounds)
@@ -451,6 +524,25 @@ class DAggerStrategy(RolloutStrategy):
self._background_push(dataset, cfg)
episodes_since_push = 0
if num_episodes is not None and dataset.num_episodes >= num_episodes:
logger.info("Target episode count reached (%d), stopping session", num_episodes)
log_say(f"All {num_episodes} episodes collected", play_sounds)
events.stop_recording.set()
break
# Pause after manual save: stop the policy, return robot to
# initial position, and wait for the operator to reset the
# environment and press SPACE.
if manual_save:
engine.pause()
events.phase = DAggerPhase.PAUSED
self._return_to_initial_position(ctx.hardware)
last_action = None
logger.info(
"Episode saved — paused for environment reset. Press SPACE to start next episode."
)
log_say("Reset the environment, then press space", play_sounds)
episode_start = time.perf_counter()
dt = time.perf_counter() - loop_start
@@ -465,10 +557,13 @@ class DAggerStrategy(RolloutStrategy):
logger.info("DAgger continuous control loop ended — pausing engine")
engine.pause()
with contextlib.suppress(Exception):
with self._episode_lock:
dataset.save_episode()
self._needs_push.set()
logger.info("Final in-progress episode saved")
buf = dataset.writer.episode_buffer
if buf and any(len(v) > 0 for v in buf.values() if isinstance(v, list)):
with self._episode_lock:
self._stamp_episode_success(dataset)
dataset.save_episode()
self._needs_push.set()
logger.info("Final in-progress episode saved")
# ------------------------------------------------------------------
# Corrections-only mode (record_autonomous=False)
@@ -540,6 +635,7 @@ class DAggerStrategy(RolloutStrategy):
# Correction ended -> save episode (blocking if not streaming)
if old_phase == DAggerPhase.CORRECTING and new_phase == DAggerPhase.PAUSED:
with self._episode_lock:
self._stamp_episode_success(dataset)
dataset.save_episode()
recorded += 1
self._needs_push.set()
@@ -581,6 +677,7 @@ class DAggerStrategy(RolloutStrategy):
**action_frame,
"task": task_str,
"intervention": np.array([True], dtype=bool),
"next.success": np.array([False], dtype=bool),
}
)
record_tick += 1
@@ -614,10 +711,13 @@ class DAggerStrategy(RolloutStrategy):
logger.info("DAgger corrections-only loop ended — pausing engine")
engine.pause()
with contextlib.suppress(Exception):
with self._episode_lock:
dataset.save_episode()
self._needs_push.set()
logger.info("Final in-progress episode saved")
buf = dataset.writer.episode_buffer
if buf and any(len(v) > 0 for v in buf.values() if isinstance(v, list)):
with self._episode_lock:
self._stamp_episode_success(dataset)
dataset.save_episode()
self._needs_push.set()
logger.info("Final in-progress episode saved")
# ------------------------------------------------------------------
# State-machine transition side-effects
+90 -11
View File
@@ -169,6 +169,7 @@ def rollout(
env_features: dict | None = None,
recording_repo_id: str | None = None,
recording_private: bool = False,
predicted_latents_callback: Callable[[PreTrainedPolicy], None] | None = None,
) -> dict:
"""Run a batched policy rollout once through a batch of environments.
@@ -198,6 +199,9 @@ def rollout(
are returned optionally because they typically take more memory to cache. Defaults to False.
render_callback: Optional rendering callback to be used after the environments are reset, and after
every step.
predicted_latents_callback: Optional callback invoked after every ``select_action`` with the policy
itself. World-model policies (e.g. LingBot-VA) stash predicted video latents on
``policy.last_predicted_latents``; this lets the caller concatenate chunks and decode once.
Returns:
The dictionary described above.
"""
@@ -276,6 +280,8 @@ def rollout(
observation = preprocessor(observation)
with torch.inference_mode():
action = policy.select_action(observation)
if predicted_latents_callback is not None:
predicted_latents_callback(policy)
action = postprocessor(action)
action_transition = {ACTION: action}
@@ -295,12 +301,22 @@ def rollout(
# available if none of the envs finished.
if "final_info" in info:
final_info = info["final_info"]
if not isinstance(final_info, dict):
raise RuntimeError(
"Unsupported `final_info` format: expected dict (Gymnasium >= 1.0). "
"You're likely using an older version of gymnasium (< 1.0). Please upgrade."
if isinstance(final_info, dict):
is_success = final_info.get("is_success", [False] * env.num_envs)
successes = (
is_success.tolist()
if hasattr(is_success, "tolist")
else [bool(is_success)] * env.num_envs
)
successes = final_info["is_success"].tolist()
else:
# Gymnasium < 1.0 returns final_info as a per-env sequence/object array,
# with entries set to a dict only for envs that just finished.
successes = []
for item in final_info:
if isinstance(item, dict) and "is_success" in item:
successes.append(bool(item["is_success"]))
else:
successes.append(False)
elif "is_success" in info:
is_success = info["is_success"]
successes = (
@@ -400,6 +416,7 @@ def eval_policy(
env_features: dict | None = None,
recording_repo_id: str | None = None,
recording_private: bool = False,
save_predicted_video: bool = False,
) -> dict:
"""
Args:
@@ -418,6 +435,11 @@ def eval_policy(
if max_episodes_rendered > 0 and not videos_dir:
raise ValueError("If max_episodes_rendered > 0, videos_dir must be provided.")
# World-model policies (e.g. LingBot-VA) opt into predicted-video saving via their config.
save_predicted_video = save_predicted_video or bool(
getattr(getattr(policy, "config", None), "save_predicted_video", False)
)
if not isinstance(policy, PreTrainedPolicy):
exc = ValueError(
f"Policy of type 'PreTrainedPolicy' is expected, but type '{type(policy)}' was provided."
@@ -461,6 +483,22 @@ def eval_policy(
if max_episodes_rendered > 0:
video_paths: list[str] = []
if save_predicted_video:
if not videos_dir:
raise ValueError("If save_predicted_video is True, videos_dir must be provided.")
predicted_video_paths: list[str] = []
n_predicted_rendered = 0
# Collect predicted-video latents across a rollout (world-model policies only). The latents are
# concatenated and decoded once after the rollout, matching upstream LingBot-VA's visualization path.
def collect_predicted_latents(policy: PreTrainedPolicy):
latents = getattr(policy, "last_predicted_latents", None)
if latents is not None:
pred_latents.append(
latents.detach().to("cpu") if hasattr(latents, "detach") else torch.as_tensor(latents).cpu()
)
policy.last_predicted_latents = None
if return_episode_data:
episode_data: dict | None = None
@@ -472,6 +510,9 @@ def eval_policy(
if max_episodes_rendered > 0:
ep_frames: list[np.ndarray] = []
if save_predicted_video:
pred_latents: list[torch.Tensor] = []
if start_seed is None:
seeds = None
else:
@@ -492,6 +533,7 @@ def eval_policy(
env_features=env_features,
recording_repo_id=recording_repo_id,
recording_private=recording_private,
predicted_latents_callback=collect_predicted_latents if save_predicted_video else None,
)
# Figure out where in each rollout sequence the first done condition was encountered (results after
@@ -557,6 +599,35 @@ def eval_policy(
threads.append(thread)
n_episodes_rendered += 1
# Maybe save the policy's predicted (imagined) video for this batch's rollout.
if save_predicted_video and len(pred_latents) > 0:
predicted_latent = torch.cat(pred_latents, dim=2)
decoder = getattr(policy, "decode_predicted_latents", None) or getattr(
policy, "_decode_predicted_video", None
)
if decoder is None:
raise AttributeError(
"Policy config requested predicted-video saving, but the policy does not expose "
"`decode_predicted_latents` or `_decode_predicted_video`."
)
predicted_video = decoder(predicted_latent)
if hasattr(predicted_video, "detach"):
predicted_video = predicted_video.detach().to("cpu").numpy()
videos_dir.mkdir(parents=True, exist_ok=True)
predicted_video_path = videos_dir / f"pred_episode_{n_predicted_rendered}.mp4"
predicted_video_paths.append(str(predicted_video_path))
thread = threading.Thread(
target=write_video,
args=(
str(predicted_video_path),
predicted_video,
env.unwrapped.metadata["render_fps"],
),
)
thread.start()
threads.append(thread)
n_predicted_rendered += 1
progbar.set_postfix(
{"running_success_rate": f"{np.mean(all_successes[:n_episodes]).item() * 100:.1f}%"}
)
@@ -600,6 +671,9 @@ def eval_policy(
if max_episodes_rendered > 0:
info["video_paths"] = video_paths
if save_predicted_video:
info["predicted_video_paths"] = predicted_video_paths
return info
@@ -740,9 +814,10 @@ class TaskMetrics(TypedDict):
max_rewards: list[float]
successes: list[bool]
video_paths: list[str]
predicted_video_paths: list[str]
ACC_KEYS = ("sum_rewards", "max_rewards", "successes", "video_paths")
ACC_KEYS = ("sum_rewards", "max_rewards", "successes", "video_paths", "predicted_video_paths")
def eval_one(
@@ -791,6 +866,7 @@ def eval_one(
max_rewards=[ep["max_reward"] for ep in per_episode],
successes=[ep["success"] for ep in per_episode],
video_paths=task_result.get("video_paths", []),
predicted_video_paths=task_result.get("predicted_video_paths", []),
)
@@ -851,6 +927,7 @@ def run_one(
if max_episodes_rendered > 0:
metrics.setdefault("video_paths", [])
metrics.setdefault("predicted_video_paths", [])
return task_group, task_id, metrics
@@ -908,11 +985,11 @@ def eval_policy_all(
_append("sum_rewards", metrics.get("sum_rewards"))
_append("max_rewards", metrics.get("max_rewards"))
_append("successes", metrics.get("successes"))
# video_paths is list-like
paths = metrics.get("video_paths", [])
if paths:
group_acc[group]["video_paths"].extend(paths)
overall["video_paths"].extend(paths)
for key in ("video_paths", "predicted_video_paths"):
paths = metrics.get(key, [])
if paths:
group_acc[group][key].extend(paths)
overall[key].extend(paths)
# Choose runner (sequential vs threaded)
task_runner = partial(
@@ -984,6 +1061,7 @@ def eval_policy_all(
"pc_success": _agg_from_list(acc["successes"]) * 100 if acc["successes"] else float("nan"),
"n_episodes": len(acc["sum_rewards"]),
"video_paths": list(acc["video_paths"]),
"predicted_video_paths": list(acc["predicted_video_paths"]),
}
# overall aggregates
@@ -995,6 +1073,7 @@ def eval_policy_all(
"eval_s": time.time() - start_t,
"eval_ep_s": (time.time() - start_t) / max(1, len(overall["sum_rewards"])),
"video_paths": list(overall["video_paths"]),
"predicted_video_paths": list(overall["predicted_video_paths"]),
}
return {
@@ -0,0 +1,78 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import pytest
from lerobot.configs.policies import PreTrainedConfig
from lerobot.configs.types import FeatureType, PolicyFeature
from lerobot.policies.lingbot_va.configuration_lingbot_va import LingBotVAConfig
from lerobot.utils.constants import ACTION, OBS_IMAGES
def make_config(**overrides) -> LingBotVAConfig:
kwargs = {"device": "cpu"}
kwargs.update(overrides)
return LingBotVAConfig(**kwargs)
def test_registered_in_choice_registry() -> None:
assert "lingbot_va" in PreTrainedConfig.get_known_choices()
assert PreTrainedConfig.get_choice_class("lingbot_va") is LingBotVAConfig
def test_type_property() -> None:
assert make_config().type == "lingbot_va"
def test_chunk_size_and_action_steps() -> None:
cfg = make_config(frame_chunk_size=4, action_per_frame=4)
assert cfg.chunk_size == 16
assert cfg.n_action_steps == 16
assert cfg.action_delta_indices == list(range(16))
assert cfg.observation_delta_indices == list(range(16))
assert cfg.reward_delta_indices is None
def test_optimizer_and_scheduler_presets() -> None:
cfg = make_config()
opt = cfg.get_optimizer_preset()
assert opt.lr == cfg.optimizer_lr
sched = cfg.get_scheduler_preset()
assert sched.num_warmup_steps == cfg.scheduler_warmup_steps
def test_validate_features_sets_action_feature() -> None:
cfg = make_config()
cfg.input_features = {f"{OBS_IMAGES}.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 128, 128))}
cfg.output_features = {}
cfg.validate_features()
assert ACTION in cfg.output_features
assert cfg.output_features[ACTION].shape == (len(cfg.used_action_channel_ids),)
def test_validate_features_no_visual_raises() -> None:
cfg = make_config()
cfg.input_features = {}
cfg.output_features = {}
with pytest.raises(ValueError, match="at least one visual input feature"):
cfg.validate_features()
def test_invalid_attn_mode_raises() -> None:
with pytest.raises(ValueError, match="attn_mode"):
make_config(attn_mode="banana")
+38
View File
@@ -0,0 +1,38 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import pytest
from lerobot.policies.factory import make_policy_config
from lerobot.policies.lingbot_va.configuration_lingbot_va import LingBotVAConfig
def test_make_policy_config_returns_lingbot_va() -> None:
cfg = make_policy_config("lingbot_va", device="cpu")
assert isinstance(cfg, LingBotVAConfig)
def test_get_policy_class_resolves_lazily() -> None:
# Importing the policy class pulls in diffusers (Wan2.2 stack); skip if unavailable.
pytest.importorskip("diffusers")
pytest.importorskip("transformers")
from lerobot.policies.factory import get_policy_class
cls = get_policy_class("lingbot_va")
assert cls.name == "lingbot_va"
assert cls.config_class is LingBotVAConfig
+128
View File
@@ -0,0 +1,128 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit tests for the vendored LingBot-VA helper code (scheduler + grid utilities)."""
from __future__ import annotations
import pytest
import torch
pytest.importorskip("diffusers") # the model code lives in modeling_lingbot_va, which imports diffusers
from lerobot.policies.lingbot_va.modeling_lingbot_va import FlowMatchScheduler
from lerobot.policies.lingbot_va.utils import data_seq_to_patch, get_mesh_id
def test_flow_match_scheduler_timesteps_monotone_decreasing() -> None:
sch = FlowMatchScheduler(shift=5.0, sigma_min=0.0, extra_one_step=True)
sch.set_timesteps(20)
assert sch.timesteps.shape == (20,)
diffs = sch.timesteps[1:] - sch.timesteps[:-1]
assert torch.all(diffs <= 0) # decreasing
def test_flow_match_scheduler_step_preserves_shape() -> None:
sch = FlowMatchScheduler(shift=5.0, sigma_min=0.0, extra_one_step=True)
sch.set_timesteps(20)
sample = torch.zeros(1, 48, 4, 8, 16)
out = sch.step(torch.ones_like(sample), sch.timesteps[0], sample)
assert out.shape == sample.shape
def test_flow_match_scheduler_add_noise() -> None:
sch = FlowMatchScheduler(shift=5.0, sigma_min=0.0, extra_one_step=True)
sch.set_timesteps(20)
sample = torch.randn(1, 48, 4, 8, 16)
noise = torch.randn_like(sample)
noisy = sch.add_noise(sample, noise, sch.timesteps[:4], t_dim=2)
assert noisy.shape == sample.shape
def test_get_mesh_id_latent_shape() -> None:
grid = get_mesh_id(4, 8, 16, 0, 1, 0)
assert grid.shape == (4, 4 * 8 * 16) # (f, h, w, stream) x tokens
def test_get_mesh_id_action_shape() -> None:
grid = get_mesh_id(4, 4, 1, 1, 1, 0, action=True)
assert grid.shape == (4, 4 * 4 * 1)
# Action rows for h/w are sentinel -1.
assert torch.all(grid[1] < 0)
assert torch.all(grid[2] < 0)
def test_data_seq_to_patch_roundtrip_shape() -> None:
b, f, h, w, c = 1, 4, 8, 16, 48
seq = torch.arange(b * f * h * w * c, dtype=torch.float32).reshape(b, f * h * w, c)
out = data_seq_to_patch((1, 2, 2), seq, f, h, w, batch_size=b)
assert out.shape == (b, c, f, h, w)
def test_training_step_reduces_loss_tiny_flex() -> None:
"""End-to-end single training step (flow-matching loss -> backward -> AdamW) on a tiny config.
Exercises the flex-attention training path; requires a CUDA GPU with flex-attention support.
"""
if not torch.cuda.is_available():
import pytest
pytest.skip("training step test requires a CUDA GPU (flex-attention)")
from lerobot.configs.types import FeatureType, PolicyFeature
from lerobot.policies.lingbot_va.configuration_lingbot_va import LingBotVAConfig
from lerobot.policies.lingbot_va.modeling_lingbot_va import LingBotVAPolicy
from lerobot.utils.constants import ACTION, OBS_IMAGES
cfg = LingBotVAConfig(
attn_mode="flex",
dtype="bfloat16",
in_channels=16,
out_channels=16,
action_dim=8,
text_dim=32,
freq_dim=64,
ffn_dim=64,
num_attention_heads=2,
attention_head_dim=24,
num_layers=2,
frame_chunk_size=2,
action_per_frame=4,
used_action_channel_ids=[0, 1, 2, 3],
obs_cam_keys=[f"{OBS_IMAGES}.image"],
device="cuda",
)
cfg.input_features = {f"{OBS_IMAGES}.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 64, 64))}
cfg.output_features = {ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(4,))}
cfg.validate_features()
policy = LingBotVAPolicy(cfg).to("cuda")
policy.train()
opt = torch.optim.AdamW(policy.get_optim_params(), lr=1e-4)
b, fc, apf = 1, cfg.frame_chunk_size, cfg.action_per_frame
latents = torch.randn(b, cfg.in_channels, fc, 4, 4, device="cuda", dtype=torch.bfloat16)
actions = torch.randn(b, cfg.action_dim, fc, apf, 1, device="cuda", dtype=torch.bfloat16)
amask = torch.zeros(cfg.action_dim, device="cuda")
amask[cfg.used_action_channel_ids] = 1.0
actions_mask = amask.view(1, -1, 1, 1, 1).expand_as(actions)
text_emb = torch.randn(b, cfg.max_sequence_length, cfg.text_dim, device="cuda", dtype=torch.bfloat16)
loss, metrics = policy.training_loss_from_streams(latents, actions, actions_mask, text_emb)
assert torch.isfinite(loss) and {"latent_loss", "action_loss"} <= set(metrics)
loss.backward()
assert any(p.grad is not None and torch.isfinite(p.grad).all() for p in policy.get_optim_params())
opt.step()
@@ -0,0 +1,88 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import torch
from lerobot.configs.types import FeatureType, PolicyFeature
from lerobot.policies.lingbot_va.configuration_lingbot_va import LingBotVAConfig
from lerobot.policies.lingbot_va.processor_lingbot_va import make_lingbot_va_pre_post_processors
from lerobot.processor import PolicyProcessorPipeline, UnnormalizerProcessorStep
from lerobot.processor.converters import policy_action_to_transition, transition_to_policy_action
from lerobot.utils.constants import (
ACTION,
OBS_IMAGES,
POLICY_POSTPROCESSOR_DEFAULT_NAME,
POLICY_PREPROCESSOR_DEFAULT_NAME,
)
def _make_config() -> LingBotVAConfig:
cfg = LingBotVAConfig(device="cpu")
cfg.input_features = {f"{OBS_IMAGES}.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 128, 128))}
cfg.output_features = {}
cfg.validate_features()
return cfg
def test_make_pre_post_processors_names_and_steps() -> None:
cfg = _make_config()
pre, post = make_lingbot_va_pre_post_processors(cfg, dataset_stats=None)
assert pre.name == POLICY_PREPROCESSOR_DEFAULT_NAME
assert post.name == POLICY_POSTPROCESSOR_DEFAULT_NAME
# Actions are unnormalized by the standard built-in quantile unnormalizer.
assert any(isinstance(s, UnnormalizerProcessorStep) for s in post.steps)
def test_freshly_built_postprocessor_is_identity() -> None:
# Without action stats the quantile unnormalizer is a no-op (identity passthrough): the real
# per-benchmark q01/q99 are restored from the saved checkpoint on load, not hardcoded here.
cfg = _make_config()
_, post = make_lingbot_va_pre_post_processors(cfg, dataset_stats=None)
normed = torch.tensor([[0.3, -0.5, 1.0, -1.0, 0.0, 0.7, -0.2]])
assert torch.allclose(post(normed), normed, atol=1e-6)
def test_postprocessor_quantile_unnormalization() -> None:
# QUANTILES unnormalize maps [-1, 1] -> [q01, q99]: -1 -> q01, +1 -> q99.
cfg = _make_config()
q01 = [-1.0, -0.5, 0.0, -1.0, -1.0, -1.0, -1.0]
q99 = [1.0, 0.5, 2.0, 1.0, 1.0, 1.0, 1.0]
stats = {ACTION: {"q01": q01, "q99": q99}}
_, post = make_lingbot_va_pre_post_processors(cfg, dataset_stats=stats)
out_lo = post(torch.full((1, 7), -1.0))
out_hi = post(torch.full((1, 7), 1.0))
assert torch.allclose(out_lo, torch.tensor(q01).unsqueeze(0), atol=1e-4)
assert torch.allclose(out_hi, torch.tensor(q99).unsqueeze(0), atol=1e-4)
def test_postprocessor_stats_survive_save_load(tmp_path) -> None:
# Regression guard for the Hub mechanism: the q01/q99 stats live in the saved post-processor
# state and must round-trip through save_pretrained / from_pretrained.
cfg = _make_config()
q01 = [-0.6, -0.8, -0.9, -0.1, -0.15, -0.25, -1.0]
q99 = [0.9, 0.85, 0.9, 0.17, 0.18, 0.34, 1.0]
_, post = make_lingbot_va_pre_post_processors(cfg, dataset_stats={ACTION: {"q01": q01, "q99": q99}})
post.save_pretrained(tmp_path)
loaded = PolicyProcessorPipeline.from_pretrained(
tmp_path,
config_filename=f"{POLICY_POSTPROCESSOR_DEFAULT_NAME}.json",
to_transition=policy_action_to_transition,
to_output=transition_to_policy_action,
)
out = loaded(torch.full((1, 7), -1.0))
assert torch.allclose(out, torch.tensor(q01).unsqueeze(0), atol=1e-4)
Generated
+10 -1
View File
@@ -3057,6 +3057,11 @@ libero = [
{ name = "torchcodec", marker = "(platform_machine == 'arm64' and sys_platform == 'darwin') or (platform_machine == 'AMD64' and sys_platform == 'linux') or (platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'arm64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux') or sys_platform == 'win32'" },
{ name = "transformers" },
]
lingbot-va = [
{ name = "accelerate" },
{ name = "diffusers" },
{ name = "transformers" },
]
matplotlib-dep = [
{ name = "contourpy" },
{ name = "matplotlib" },
@@ -3265,6 +3270,7 @@ requires-dist = [
{ name = "ipykernel", marker = "extra == 'notebook'", specifier = ">=6.0.0,<7.0.0" },
{ name = "jsonlines", marker = "extra == 'dataset'", specifier = ">=4.0.0,<5.0.0" },
{ name = "jupyter", marker = "extra == 'notebook'", specifier = ">=1.0.0,<2.0.0" },
{ name = "lerobot", extras = ["accelerate-dep"], marker = "extra == 'lingbot-va'" },
{ name = "lerobot", extras = ["accelerate-dep"], marker = "extra == 'smolvla'" },
{ name = "lerobot", extras = ["accelerate-dep"], marker = "extra == 'training'" },
{ name = "lerobot", extras = ["aloha"], marker = "extra == 'all'" },
@@ -3292,6 +3298,7 @@ requires-dist = [
{ name = "lerobot", extras = ["diffusers-dep"], marker = "extra == 'diffusion'" },
{ name = "lerobot", extras = ["diffusers-dep"], marker = "extra == 'fastwam'" },
{ name = "lerobot", extras = ["diffusers-dep"], marker = "extra == 'groot'" },
{ name = "lerobot", extras = ["diffusers-dep"], marker = "extra == 'lingbot-va'" },
{ name = "lerobot", extras = ["diffusers-dep"], marker = "extra == 'multi-task-dit'" },
{ name = "lerobot", extras = ["diffusers-dep"], marker = "extra == 'vla-jepa'" },
{ name = "lerobot", extras = ["diffusion"], marker = "extra == 'all'" },
@@ -3312,6 +3319,7 @@ requires-dist = [
{ name = "lerobot", extras = ["kinematics"], marker = "extra == 'all'" },
{ name = "lerobot", extras = ["lekiwi"], marker = "extra == 'all'" },
{ name = "lerobot", extras = ["libero"], marker = "sys_platform == 'linux' and extra == 'all'" },
{ name = "lerobot", extras = ["lingbot-va"], marker = "extra == 'all'" },
{ name = "lerobot", extras = ["matplotlib-dep"], marker = "extra == 'async'" },
{ name = "lerobot", extras = ["matplotlib-dep"], marker = "extra == 'sarm'" },
{ name = "lerobot", extras = ["matplotlib-dep"], marker = "extra == 'unitree-g1'" },
@@ -3370,6 +3378,7 @@ requires-dist = [
{ name = "lerobot", extras = ["transformers-dep"], marker = "extra == 'groot'" },
{ name = "lerobot", extras = ["transformers-dep"], marker = "extra == 'hilserl'" },
{ name = "lerobot", extras = ["transformers-dep"], marker = "extra == 'libero'" },
{ name = "lerobot", extras = ["transformers-dep"], marker = "extra == 'lingbot-va'" },
{ name = "lerobot", extras = ["transformers-dep"], marker = "extra == 'molmoact2'" },
{ name = "lerobot", extras = ["transformers-dep"], marker = "extra == 'multi-task-dit'" },
{ name = "lerobot", extras = ["transformers-dep"], marker = "extra == 'peft'" },
@@ -3449,7 +3458,7 @@ requires-dist = [
{ name = "transformers", marker = "extra == 'transformers-dep'", specifier = ">=5.4.0,<5.6.0" },
{ name = "wandb", marker = "extra == 'training'", specifier = ">=0.24.0,<0.28.0" },
]
provides-extras = ["dataset", "training", "hardware", "viz", "core-scripts", "evaluation", "dataset-viz", "av-dep", "pygame-dep", "placo-dep", "transformers-dep", "grpcio-dep", "accelerate-dep", "can-dep", "peft-dep", "scipy-dep", "diffusers-dep", "qwen-vl-utils-dep", "matplotlib-dep", "pyserial-dep", "deepdiff-dep", "pynput-dep", "pyzmq-dep", "motorbridge-dep", "motorbridge-smart-servo-dep", "feetech", "dynamixel", "damiao", "robstride", "openarms", "gamepad", "hopejr", "lekiwi", "unitree-g1", "reachy2", "rebot", "kinematics", "intelrealsense", "phone", "diffusion", "wallx", "pi", "molmoact2", "smolvla", "multi-task-dit", "groot", "sarm", "robometer", "topreward", "xvla", "eo1", "fastwam", "hilserl", "vla-jepa", "async", "peft", "annotations", "dev", "notebook", "test", "video-benchmark", "aloha", "pusht", "libero", "metaworld", "all"]
provides-extras = ["dataset", "training", "hardware", "viz", "core-scripts", "evaluation", "dataset-viz", "av-dep", "pygame-dep", "placo-dep", "transformers-dep", "grpcio-dep", "accelerate-dep", "can-dep", "peft-dep", "scipy-dep", "diffusers-dep", "qwen-vl-utils-dep", "matplotlib-dep", "pyserial-dep", "deepdiff-dep", "pynput-dep", "pyzmq-dep", "motorbridge-dep", "motorbridge-smart-servo-dep", "feetech", "dynamixel", "damiao", "robstride", "openarms", "gamepad", "hopejr", "lekiwi", "unitree-g1", "reachy2", "rebot", "kinematics", "intelrealsense", "phone", "diffusion", "wallx", "pi", "molmoact2", "smolvla", "multi-task-dit", "groot", "sarm", "robometer", "topreward", "xvla", "eo1", "fastwam", "hilserl", "vla-jepa", "lingbot-va", "async", "peft", "annotations", "dev", "notebook", "test", "video-benchmark", "aloha", "pusht", "libero", "metaworld", "all"]
[[package]]
name = "librt"