Compare commits

..

36 Commits

Author SHA1 Message Date
Steven Palma 1ec9392bcb chore(style): pre-commit envs 2026-02-24 15:03:36 +01:00
Steven Palma 84b34ae75c Merge branch 'main' into envs/support-more-args
Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-24 15:01:17 +01:00
Dominik Paľo 7fd71c83a3 docs: add WSL evdev installation note (#2855)
Add a note in the installation guide explaining that users on WSL need to install evdev to avoid build issues.
See: https://github.com/huggingface/lerobot/issues/2528

Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-23 20:41:20 +01:00
Yuan Haokuan 0f44adbeec docs: fix HF_USER export command to correctly parse username (#2932)
* Fix HF_USER extraction command in documentation

Updated command to extract the username from hf auth output.

Signed-off-by: Yuan Haokuan <138340416+WilbertYuan@users.noreply.github.com>

* Correct HF_USER variable assignment in documentation

Fix the variable extraction from hf auth output.

Signed-off-by: Yuan Haokuan <138340416+WilbertYuan@users.noreply.github.com>

* Update docs/source/il_robots.mdx

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Yuan Haokuan <138340416+WilbertYuan@users.noreply.github.com>

---------

Signed-off-by: Yuan Haokuan <138340416+WilbertYuan@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-23 17:51:13 +01:00
Guilherme Miotto 7dbbaa3727 Small comment fix (#2990)
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-23 17:11:55 +01:00
Yuta Nakagawa fcabfd32a5 chore(docs): update the document for Phone teleop to clarify how to use the examples (#2991)
* update the document for Phone teleope to clarify how to use the examples

* Update docs/source/phone_teleop.mdx

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Yuta Nakagawa <ytnkgw@gmail.com>

---------

Signed-off-by: Yuta Nakagawa <ytnkgw@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-23 17:11:46 +01:00
Steven Palma 544cbc5f38 feat(motors): add RobStride CAN implementation (#2821)
* feat(motors): add initial implementation of robstride

Co-authored-by: Virgile <virgilebatto@gmail.com>

* chore(motors): solve some linter

* remove kp/kd attribute

* code uniformisation between damiao and robstride

* remove normalization warning

* remove non valid baudrates and small docstring update

* remove all useless files. Only keeping robstride.py and table.py

* typing for mypy

* reduce NameOrId usage

* align signature with damiao

* put the same helper than in the damiao implementation

* bug correction : expect a response after each bus.send

---------

Co-authored-by: Virgile <virgilebatto@gmail.com>
2026-02-23 16:39:04 +01:00
Yueci Deng a0c5d19391 add metadata_buffer_size to dataset creation (#2998)
Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-23 16:32:59 +01:00
Steven Palma e96339a3b4 feat(dataset): add streaming video encoding + HW encoder support (#2974)
* feat(dataset): init stream encoding

* feat(dataset): use threads to fix frame pickle latency

* refactor(dataset): remove HW encoded related changes

* add lp (#2977)

* feat(dataset): add Hw encoding + log drop frames (#2978)

* chore(docs): add streaming video encoding guide

* fix(dataset): style docs + testing

* chore(docs): simplify sttreaming video encoding guide

* chore(dataset): add commands + streaming encoding default false + print note if false + queue default is now 30

* chore(docs): add verification note advice

* chore(dataset): adjusting defaults & docs for streaming encoding

* docs(scripts): improve docstrings

* test(dataset): polish streaming encoding tests

* chore(dataset): move FYI log related to streaming

* chore(dataset): add arg vcodec to suggestions

* refactor(dataset): better handling for auto and available vcodec

* chore(dataset): change log level

* docs(dataset): add note related to training performance vcodec

* docs(dataset): add more notes to streaming encoding

---------

Co-authored-by: Caroline Pascal <caroline8.pascal@gmail.com>
Co-authored-by: Pepijn <pepijn@huggingface.co>
2026-02-23 13:57:43 +01:00
Steven Palma 5865170d36 chore(deps): bump ceil datasets (#2946) 2026-02-20 17:01:46 +01:00
Khalil 2dd366436e Fix gym-hil integration with the new LeRobot pipeline. (#2482)
* Add GymHILAdapterProcessorStep for gym-hil environment integration

* Fix action features in control loop for None teleop device with gym-hil

* Finalize dataset before pushing to hub for visualization on the hub

* Fix neutral action for gripper

* fix pre-commit
2026-02-19 14:35:02 +01:00
Steven Palma 5f15232271 chore: remove usernames + use entrypoints in docs, comments & sample commands (#2988) 2026-02-18 22:46:12 +01:00
Steven Palma bc38261321 feat(robots): use read_latest() camera (#2987)
* feat(robots): use read_latest() camera

* fix(test): add read_latest reachy cam mock
2026-02-18 20:05:15 +01:00
Caroline Pascal aaf3707058 fix(filtering): fixing episodes filtering in load_nested_dataset to always use .from_parquet() (#2982) 2026-02-18 19:16:53 +01:00
Steven Palma 89bd58a9a2 chore(scripts): warn if we don't respect the target FPS (#2986) 2026-02-18 18:22:35 +01:00
Steven Palma b22e0315b0 fix(utils): more conservative sleep_margin default value in precise_sleep (#2985) 2026-02-18 17:32:25 +01:00
HUANG TZU-CHUN fcbf550952 fix(docs): update environment variable name to HF_LEROBOT_HOME in docstring (#2973)
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-18 11:27:40 +01:00
Sota Nakamura af036ce57e fix(scripts): serve grpc for a web viewer (#2881)
* serve grpc for a web viewer

* add help

* remove ip detection

* fix comment

* pass grpc_port

* fix(CLI): fixing CLI display-compressed-images argument 1/2

Co-authored-by: HUANG TZU-CHUN <tzu.chun.huang.tw@gmail.com>
Signed-off-by: Caroline Pascal <caroline8.pascal@gmail.com>

* fix(CLI): fixing CLI display-compressed-images argument 2/2

Co-authored-by: HUANG TZU-CHUN <tzu.chun.huang.tw@gmail.com>
Signed-off-by: Caroline Pascal <caroline8.pascal@gmail.com>

---------

Signed-off-by: Caroline Pascal <caroline8.pascal@gmail.com>
Co-authored-by: Caroline Pascal <caroline8.pascal@gmail.com>
Co-authored-by: HUANG TZU-CHUN <tzu.chun.huang.tw@gmail.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-18 01:05:51 +01:00
Vladislav Sovrasov 1c388c0002 (Chore) Bump upper bound for torch version (#2897)
* Bump upper torch version bound

* Apply suggestion from @Copilot

Signed-off-by: Vladislav Sovrasov <vladislav.sovrasov@intel.com>

* Update ref state dicts for schedulers

* Support older than 2.8 torch versions

* Fix precommit

---------

Signed-off-by: Vladislav Sovrasov <vladislav.sovrasov@intel.com>
2026-02-17 23:37:46 +01:00
masato-ka 51d3822d75 feat(datasets): Add info operation to lerobot-edit-dataset command (#2917)
* Add New featrue to lerobot_edit_datset.py that show dataset information.

* Fix to draccus error when happen give only --operation.type=info

* Updating test and documents regarding lerobot-edit-dataset info function.

* Updating documents regarding lerobot-edit-dataset extract function. option name in document is mistake.

* feat(datasets): Update to align formatting with pre-commit.(#2917)

Update to align formatting by pre-commit.

---------

Co-authored-by: Caroline Pascal <caroline8.pascal@gmail.com>
2026-02-17 20:09:42 +01:00
Pepijn 6600b60e7f always use degrees (#2968) 2026-02-13 13:49:01 +01:00
Caroline Pascal adebbcf090 fix(dataset tools draccus): fixing draccus parsing for dataset edit operation type specification (#2949)
* fix(edit dataset operation): fixing dataset tools CLI operation type specification

* test(edit dataset operation): adding tests for dataset tools operation type specification

* chore(format): running pre-commit

* chore(backward compatibility): adding a type property in OperationConfig for backward compatibility

Signed-off-by: Caroline Pascal <caroline8.pascal@gmail.com>
2026-02-12 18:56:04 +01:00
taken-yjyoon 3615160d89 fix(typo): Fixing wrong argparse examples in the comments (using 'True' not 'true') (#1040)
Co-authored-by: juni <>
2026-02-12 18:13:51 +01:00
Steven Palma fc8a388a25 feat(cameras): make backend configurable to the CLI (#2945)
* feat(cameras): make backend configurable to the CLI

* chore(cameras): address feedback

* feat(Enum error messages): adding better instanciation error messages for Enum classes

* chore(Enum error messages): propagating Enum error messages to all camera classes

* chore(comments): removing superfluous comments

* chore(format): applying ruff checks

---------

Co-authored-by: CarolinePascal <caroline8.pascal@gmail.com>
2026-02-11 13:57:25 +01:00
Steven Palma 3c84d271d5 fix(motors): use decorator to fix precommit (#2951) 2026-02-10 18:40:50 +01:00
Steven Palma 1ba3975020 chore: use is_connected decorators (#2948)
* chore: use is_connected decorators

* chore(robots): add is_connected to bi setups too
2026-02-10 17:49:30 +01:00
Steven Palma 35363c5798 chore(linter): ensure motors module passes MyPy type checks (#2939)
* fix: ensure motors module passes MyPy type checks

This commit fixes 62 mypy type errors in the motors module by:

- Updating Protocol classes (PortHandler, PacketHandler, GroupSyncRead,
  GroupSyncWrite) to use class-level attribute declarations instead of
  __init__ body declarations
- Adding missing `broadcastPing` method to PacketHandler Protocol
- Fixing return type annotations (e.g., `_get_motor_model` returns str, not int)
- Fixing parameter types to use `Sequence` for covariant list parameters
- Fixing `Mapping` for covariant dict value types in `_normalize`
- Updating method signatures to be consistent across parent and child classes
  (disable_torque, enable_torque, _get_half_turn_homings)
- Adding explicit `int()` casts for MotorCalibration arguments
- Adding explicit `return None` for functions returning Optional types
- Adding type annotations for variables like `data_list: dict[int, int]`
- Using `# type: ignore[method-assign]` for intentional monkeypatch
- Fixing variable references (using `self.groups` instead of `groups`)

Fixes #1723

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* chore(style): pre-commit after main merge

* chore(linter): solve comments

* chore(linter): apply pre-commit fixes to damiao

* chore(linter): more fixes to damiao

---------

Co-authored-by: yurekami <yurekami@users.noreply.github.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-10 17:35:39 +01:00
whats2000 778db19a17 [Bug Fix] fix(ci): prevent runner group error on fork pushes (#2911)
* fix(ci): prevent runner group error on fork pushes

Add repository check to unbound_deps_tests workflow to ensure
aws-general-8-plus runner group is only used on main repository,
preventing 'Required runner group not found' errors on forks.

* fix(ci): use gating job to prevent runner allocation on forks

The previous approach failed because GitHub evaluates runs-on before if conditions.
Now using a check-repo job that runs on ubuntu-latest first, and all jobs with
special runners depend on it and check its output before being scheduled.

* fix(ci): add gating job to full_tests to prevent runner allocation on forks

Apply the same gating pattern used in unbound_deps_tests to full_tests.yml
to prevent GitHub from trying to allocate custom runners when workflows
run on forks. The check-repo job runs first on ubuntu-latest and all jobs
with custom runners depend on it and check its output.

* fix(ci): add repository check to unbound_deps_tests workflow

Add 'if: github.repository == huggingface/lerobot' check to build-and-push-docker job to prevent runner group access errors on forks, matching the pattern used in nightly.yml

* fix(ci): add repository check to full_tests workflow

Add 'if: github.repository == huggingface/lerobot' check to build-and-push-docker and gpu-tests jobs to prevent runner group access errors on forks

* refactor(ci): remove redundant check from gpu-tests job

gpu-tests depends on build-and-push-docker via needs, so it will automatically skip when the parent job is skipped

* refactor(ci): remove unnecessary fork check from full-tests job

full-tests runs on ubuntu-latest which is available to all forks, no need to restrict it

---------

Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-10 15:21:40 +01:00
Jai Kumaar Ratadia d2d01399d6 docs: clarify installation steps are sequential, not optional (#2925)
* docs: clarify installation steps are sequential, not optional

Add intro paragraph noting conda is one path (not the only one) and
number the three sections as steps so readers understand miniforge and
environment setup are prerequisites, not independent choices.

* Update installation guide link for LeRobot

Signed-off-by: Jai Kumaar Ratadia <jaikumaarratadia@gmail.com>

* Fix link formatting in installation guide again

Signed-off-by: Jai Kumaar Ratadia <jaikumaarratadia@gmail.com>

---------

Signed-off-by: Jai Kumaar Ratadia <jaikumaarratadia@gmail.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-10 15:18:32 +01:00
Aoqun Jin 5eba4ce6f4 Change LIBERO init_state_id when reset. (#2899)
* Change LIBERO init_state_id when reset.

Signed-off-by: Aoqun Jin <aojiaojiao@foxmail.com>

* Change LIBERO init_state_id when reset.

Signed-off-by: Aoqun Jin <aojiaojiao@foxmail.com>

* pre-commit run

---------

Signed-off-by: Aoqun Jin <aojiaojiao@foxmail.com>
Co-authored-by: Jade Choghari <chogharijade@gmail.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-10 16:39:17 +03:00
Stepan Feduniak cca0296cd6 fix(pipeline): use FeatureType for STATE features in Libero processor (#2888)
* fix the types

* pre-commit

---------

Co-authored-by: Jade Choghari <chogharijade@gmail.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-10 15:55:11 +03:00
Steven Palma 489cb7b6b9 fix(scripts): correct can import check (#2937) 2026-02-09 16:58:32 +01:00
Reece O'Mahoney e14bdf57d0 Convert tensors to scalars (#2903)
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-09 14:46:12 +01:00
Jade Choghari ff267c772b allow lerobot-eval to work with kwargs 2025-12-26 17:39:03 +00:00
Jade Choghari 652b1b854d Merge branch 'main' into envs/support-more-args 2025-12-23 16:07:22 +03:00
Jade Choghari 8831b3c47b add changes 2025-12-08 11:11:38 +01:00
115 changed files with 3550 additions and 10633 deletions
+5 -3
View File
@@ -101,9 +101,11 @@ jobs:
runs-on:
group: aws-general-8-plus
if: |
(github.event_name == 'pull_request_review' && github.event.review.state == 'approved' && github.event.pull_request.head.repo.fork == false) ||
github.event_name == 'push' ||
github.event_name == 'workflow_dispatch'
github.repository == 'huggingface/lerobot' && (
(github.event_name == 'pull_request_review' && github.event.review.state == 'approved' && github.event.pull_request.head.repo.fork == false) ||
github.event_name == 'push' ||
github.event_name == 'workflow_dispatch'
)
outputs:
image_tag: ${{ steps.set_tag.outputs.image_tag }}
env:
+1
View File
@@ -91,6 +91,7 @@ jobs:
name: Build and Push Docker
runs-on:
group: aws-general-8-plus
if: github.repository == 'huggingface/lerobot'
outputs:
image_tag: ${{ env.DOCKER_IMAGE_NAME }}
env:
+42 -42
View File
@@ -28,9 +28,9 @@ We don't expect the same optimal settings for a dataset of images from a simulat
For these reasons, we run this benchmark on four representative datasets:
- `lerobot/pusht_image`: (96 x 96 pixels) simulation with simple geometric shapes, fixed camera.
- `aliberts/aloha_mobile_shrimp_image`: (480 x 640 pixels) real-world indoor, moving camera.
- `aliberts/paris_street`: (720 x 1280 pixels) real-world outdoor, moving camera.
- `aliberts/kitchen`: (1080 x 1920 pixels) real-world indoor, fixed camera.
- `lerobot/aloha_mobile_shrimp_image`: (480 x 640 pixels) real-world indoor, moving camera.
- `lerobot/paris_street`: (720 x 1280 pixels) real-world outdoor, moving camera.
- `lerobot/kitchen`: (1080 x 1920 pixels) real-world indoor, fixed camera.
Note: The datasets used for this benchmark need to be image datasets, not video datasets.
@@ -179,7 +179,7 @@ python benchmark/video/run_video_benchmark.py \
--output-dir outputs/video_benchmark \
--repo-ids \
lerobot/pusht_image \
aliberts/aloha_mobile_shrimp_image \
lerobot/aloha_mobile_shrimp_image \
--vcodec libx264 libx265 \
--pix-fmt yuv444p yuv420p \
--g 2 20 None \
@@ -203,9 +203,9 @@ python benchmark/video/run_video_benchmark.py \
--output-dir outputs/video_benchmark \
--repo-ids \
lerobot/pusht_image \
aliberts/aloha_mobile_shrimp_image \
aliberts/paris_street \
aliberts/kitchen \
lerobot/aloha_mobile_shrimp_image \
lerobot/paris_street \
lerobot/kitchen \
--vcodec libx264 libx265 \
--pix-fmt yuv444p yuv420p \
--g 1 2 3 4 5 6 10 15 20 40 None \
@@ -221,9 +221,9 @@ python benchmark/video/run_video_benchmark.py \
--output-dir outputs/video_benchmark \
--repo-ids \
lerobot/pusht_image \
aliberts/aloha_mobile_shrimp_image \
aliberts/paris_street \
aliberts/kitchen \
lerobot/aloha_mobile_shrimp_image \
lerobot/paris_street \
lerobot/kitchen \
--vcodec libsvtav1 \
--pix-fmt yuv420p \
--g 1 2 3 4 5 6 10 15 20 40 None \
@@ -252,37 +252,37 @@ Since we're using av1 encoding, we're choosing the `pyav` decoder as `video_read
These tables show the results for `g=2` and `crf=30`, using `timestamps-modes=6_frames` and `backend=pyav`
| video_images_size_ratio | vcodec | pix_fmt | | | |
| ---------------------------------- | ---------- | ------- | --------- | --------- | --------- |
| | libx264 | | libx265 | | libsvtav1 |
| repo_id | yuv420p | yuv444p | yuv420p | yuv444p | yuv420p |
| lerobot/pusht_image | **16.97%** | 17.58% | 18.57% | 18.86% | 22.06% |
| aliberts/aloha_mobile_shrimp_image | 2.14% | 2.11% | 1.38% | **1.37%** | 5.59% |
| aliberts/paris_street | 2.12% | 2.13% | **1.54%** | **1.54%** | 4.43% |
| aliberts/kitchen | 1.40% | 1.39% | **1.00%** | **1.00%** | 2.52% |
| video_images_size_ratio | vcodec | pix_fmt | | | |
| --------------------------------- | ---------- | ------- | --------- | --------- | --------- |
| | libx264 | | libx265 | | libsvtav1 |
| repo_id | yuv420p | yuv444p | yuv420p | yuv444p | yuv420p |
| lerobot/pusht_image | **16.97%** | 17.58% | 18.57% | 18.86% | 22.06% |
| lerobot/aloha_mobile_shrimp_image | 2.14% | 2.11% | 1.38% | **1.37%** | 5.59% |
| lerobot/paris_street | 2.12% | 2.13% | **1.54%** | **1.54%** | 4.43% |
| lerobot/kitchen | 1.40% | 1.39% | **1.00%** | **1.00%** | 2.52% |
| video_images_load_time_ratio | vcodec | pix_fmt | | | |
| ---------------------------------- | ------- | ------- | -------- | ------- | --------- |
| | libx264 | | libx265 | | libsvtav1 |
| repo_id | yuv420p | yuv444p | yuv420p | yuv444p | yuv420p |
| lerobot/pusht_image | 6.45 | 5.19 | **1.90** | 2.12 | 2.47 |
| aliberts/aloha_mobile_shrimp_image | 11.80 | 7.92 | 0.71 | 0.85 | **0.48** |
| aliberts/paris_street | 2.21 | 2.05 | 0.36 | 0.49 | **0.30** |
| aliberts/kitchen | 1.46 | 1.46 | 0.28 | 0.51 | **0.26** |
| video_images_load_time_ratio | vcodec | pix_fmt | | | |
| --------------------------------- | ------- | ------- | -------- | ------- | --------- |
| | libx264 | | libx265 | | libsvtav1 |
| repo_id | yuv420p | yuv444p | yuv420p | yuv444p | yuv420p |
| lerobot/pusht_image | 6.45 | 5.19 | **1.90** | 2.12 | 2.47 |
| lerobot/aloha_mobile_shrimp_image | 11.80 | 7.92 | 0.71 | 0.85 | **0.48** |
| lerobot/paris_street | 2.21 | 2.05 | 0.36 | 0.49 | **0.30** |
| lerobot/kitchen | 1.46 | 1.46 | 0.28 | 0.51 | **0.26** |
| | | vcodec | pix_fmt | | | |
| ---------------------------------- | -------- | -------- | ------------ | -------- | --------- | ------------ |
| | | libx264 | | libx265 | | libsvtav1 |
| repo_id | metric | yuv420p | yuv444p | yuv420p | yuv444p | yuv420p |
| lerobot/pusht_image | avg_mse | 2.90E-04 | **2.03E-04** | 3.13E-04 | 2.29E-04 | 2.19E-04 |
| | avg_psnr | 35.44 | 37.07 | 35.49 | **37.30** | 37.20 |
| | avg_ssim | 98.28% | **98.85%** | 98.31% | 98.84% | 98.72% |
| aliberts/aloha_mobile_shrimp_image | avg_mse | 2.76E-04 | 2.59E-04 | 3.17E-04 | 3.06E-04 | **1.30E-04** |
| | avg_psnr | 35.91 | 36.21 | 35.88 | 36.09 | **40.17** |
| | avg_ssim | 95.19% | 95.18% | 95.00% | 95.05% | **97.73%** |
| aliberts/paris_street | avg_mse | 6.89E-04 | 6.70E-04 | 4.03E-03 | 4.02E-03 | **3.09E-04** |
| | avg_psnr | 33.48 | 33.68 | 32.05 | 32.15 | **35.40** |
| | avg_ssim | 93.76% | 93.75% | 89.46% | 89.46% | **95.46%** |
| aliberts/kitchen | avg_mse | 2.50E-04 | 2.24E-04 | 4.28E-04 | 4.18E-04 | **1.53E-04** |
| | avg_psnr | 36.73 | 37.33 | 36.56 | 36.75 | **39.12** |
| | avg_ssim | 95.47% | 95.58% | 95.52% | 95.53% | **96.82%** |
| | | vcodec | pix_fmt | | | |
| --------------------------------- | -------- | -------- | ------------ | -------- | --------- | ------------ |
| | | libx264 | | libx265 | | libsvtav1 |
| repo_id | metric | yuv420p | yuv444p | yuv420p | yuv444p | yuv420p |
| lerobot/pusht_image | avg_mse | 2.90E-04 | **2.03E-04** | 3.13E-04 | 2.29E-04 | 2.19E-04 |
| | avg_psnr | 35.44 | 37.07 | 35.49 | **37.30** | 37.20 |
| | avg_ssim | 98.28% | **98.85%** | 98.31% | 98.84% | 98.72% |
| lerobot/aloha_mobile_shrimp_image | avg_mse | 2.76E-04 | 2.59E-04 | 3.17E-04 | 3.06E-04 | **1.30E-04** |
| | avg_psnr | 35.91 | 36.21 | 35.88 | 36.09 | **40.17** |
| | avg_ssim | 95.19% | 95.18% | 95.00% | 95.05% | **97.73%** |
| lerobot/paris_street | avg_mse | 6.89E-04 | 6.70E-04 | 4.03E-03 | 4.02E-03 | **3.09E-04** |
| | avg_psnr | 33.48 | 33.68 | 32.05 | 32.15 | **35.40** |
| | avg_ssim | 93.76% | 93.75% | 89.46% | 89.46% | **95.46%** |
| lerobot/kitchen | avg_mse | 2.50E-04 | 2.24E-04 | 4.28E-04 | 4.18E-04 | **1.53E-04** |
| | avg_psnr | 36.73 | 37.33 | 36.56 | 36.75 | **39.12** |
| | avg_ssim | 95.47% | 95.58% | 95.52% | 95.53% | **96.82%** |
+2 -2
View File
@@ -27,10 +27,10 @@
title: Porting Large Datasets
- local: using_dataset_tools
title: Using the Dataset Tools
- local: annotation_tools
title: Using the Annotation Tools
- local: dataset_subtask
title: Using Subtasks in the Dataset
- local: streaming_video_encoding
title: Streaming Video Encoding
title: "Datasets"
- sections:
- local: act
+3
View File
@@ -88,5 +88,8 @@ lerobot-record \
--dataset.repo_id=${HF_USER}/eval_act_your_dataset \
--dataset.num_episodes=10 \
--dataset.single_task="Your task description" \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
# --dataset.vcodec=auto \
--policy.path=${HF_USER}/act_policy
```
-425
View File
@@ -1,425 +0,0 @@
# Dataset Annotation Tools
This guide explains how to use the automatic annotation tools to add skill labels and synthetic dialogue to your LeRobot datasets.
## Overview
The annotation pipeline consists of two main components:
1. **Subtask Annotation** (`subtask_annotate.py`): Automatically segments robot demonstrations into atomic skills using Vision-Language Models (VLMs)
2. **High-Level Annotation** (`high_level_annotate.py`): Generates synthetic user prompts and robot utterances for hierarchical policy training
These tools enable you to transform raw robot demonstration data into richly annotated datasets suitable for training hierarchical policies.
## Installation Requirements
Before using the annotation tools, ensure you have the required dependencies:
```bash
pip install transformers qwen-vl-utils opencv-python rich pandas pyarrow
```
You'll also need FFmpeg for video processing:
```bash
# Ubuntu/Debian
sudo apt-get install ffmpeg
# macOS
brew install ffmpeg
```
## Part 1: Subtask Annotation
### What It Does
The subtask annotator segments each episode into short atomic manipulation skills (1-3 seconds each). For example, a "pick and place" episode might be segmented into:
- "reach towards object" (0.0s - 1.2s)
- "grasp object" (1.2s - 2.1s)
- "lift object" (2.1s - 3.5s)
- "move to target" (3.5s - 5.0s)
- "release object" (5.0s - 6.2s)
### Usage
#### Basic Example
```bash
python src/lerobot/policies/pi05_full/annotate/subtask_annotate.py \
--repo-id your-username/your-dataset \
--video-key observation.images.base \
--output-dir /path/to/output
```
#### With Local Dataset
```bash
python src/lerobot/policies/pi05_full/annotate/subtask_annotate.py \
--data-dir /path/to/local/dataset \
--video-key observation.images.base \
--output-dir /path/to/output
```
#### Advanced Options
```bash
python src/lerobot/policies/pi05_full/annotate/subtask_annotate.py \
--repo-id your-username/your-dataset \
--video-key observation.images.base \
--model Qwen/Qwen2-VL-7B-Instruct \
--batch-size 16 \
--output-dir /path/to/output \
--push-to-hub
```
### Parameters
| Parameter | Description | Default |
|-----------|-------------|---------|
| `--repo-id` | HuggingFace Hub dataset ID | Required (or use --data-dir) |
| `--data-dir` | Path to local dataset | Required (or use --repo-id) |
| `--video-key` | Video observation key | Required |
| `--model` | VLM model to use | `Qwen/Qwen2-VL-7B-Instruct` |
| `--device` | Device to run model on | `cuda` |
| `--dtype` | Model dtype | `bfloat16` |
| `--batch-size` | Episodes per batch | `8` |
| `--episodes` | Specific episodes to annotate | All episodes |
| `--output-dir` | Output directory | Auto-generated |
| `--push-to-hub` | Push to HuggingFace Hub | `False` |
### Supported Models
- **Qwen2-VL**: `Qwen/Qwen2-VL-2B-Instruct`, `Qwen/Qwen2-VL-7B-Instruct`, `Qwen/Qwen2-VL-72B-Instruct`
- **Qwen3-VL**: `Qwen/Qwen3-VL-30B-A3B-Instruct`
### Output Files
The subtask annotation creates the following files in your dataset:
1. **`meta/subtasks.parquet`**: DataFrame with unique subtask names
```python
# Structure:
# Index: subtask name (string)
# Column: subtask_index (int64)
```
2. **`meta/skills.json`**: Raw skill annotations with timestamps
```json
{
"coarse_description": "Pick and place the object",
"skill_to_subtask_index": {
"reach towards object": 0,
"grasp object": 1,
...
},
"episodes": {
"0": {
"episode_index": 0,
"description": "Pick and place the object",
"skills": [
{"name": "reach towards object", "start": 0.0, "end": 1.2},
{"name": "grasp object", "start": 1.2, "end": 2.1},
...
]
}
}
}
```
3. **`subtask_index` feature**: Added to each frame in the dataset
- Type: `int64`
- Shape: `(1,)`
- Maps each frame to its corresponding subtask
### Accessing Subtask Annotations
```python
from lerobot.datasets.lerobot_dataset import LeRobotDataset
# Load annotated dataset
dataset = LeRobotDataset(repo_id="your/dataset_with_subtasks")
# Get a frame
frame = dataset[100]
# Get the subtask for this frame
subtask_idx = frame["subtask_index"].item()
subtask_name = dataset.meta.subtasks.iloc[subtask_idx].name
print(f"Frame 100 is performing: {subtask_name}")
# Load all subtasks
subtasks_df = dataset.meta.subtasks
print(subtasks_df)
```
## Part 2: High-Level Annotation
### What It Does
The high-level annotator generates synthetic dialogue for hierarchical policy training. For each skill, it creates:
- **User Prompt** (`_t`): A natural language request from the user
- **Robot Utterance** (`u_t`): A natural language response from the robot
This enables training policies that can understand and respond to human instructions in natural dialogue.
### Prerequisites
**Important**: You must run subtask annotation first! High-level annotation requires the `skills.json` file generated by subtask annotation.
### Usage
#### Image Mode (Default)
Samples frames at regular intervals and passes images to the VLM:
```bash
python src/lerobot/policies/pi05_full/annotate/high_level_annotate.py \
--repo-id your/dataset_with_subtasks \
--model Qwen/Qwen2-VL-7B-Instruct \
--image-key observation.images.base \
--output-dir /path/to/output
```
#### Video Mode
Passes entire episode videos to the VLM for better temporal understanding:
```bash
python src/lerobot/policies/pi05_full/annotate/high_level_annotate.py \
--repo-id your/dataset_with_subtasks \
--model Qwen/Qwen2-VL-7B-Instruct \
--video-mode \
--video-key observation.images.base \
--video-batch-size 4 \
--output-dir /path/to/output
```
### Parameters
| Parameter | Description | Default |
|-----------|-------------|---------|
| `--repo-id` | HuggingFace Hub dataset ID | Required (or use --data-dir) |
| `--data-dir` | Path to local dataset | Required (or use --repo-id) |
| `--model` | VLM model to use | `Qwen/Qwen2-VL-7B-Instruct` |
| `--image-key` | Image observation key (image mode) | First camera key |
| `--video-mode` | Use video instead of images | `False` |
| `--video-key` | Video observation key (video mode) | Auto-detected |
| `--video-batch-size` | Episodes per batch (video mode) | `1` |
| `--sample-interval` | Sampling interval in seconds | `1.0` |
| `--temperature` | Sampling temperature | `0.7` |
| `--output-dir` | Output directory | Auto-generated |
| `--push-to-hub` | Push to HuggingFace Hub | `False` |
### Output Files
The high-level annotation creates:
1. **`meta/tasks_high_level.parquet`**: DataFrame with high-level tasks
```python
# Structure:
# Index: task string (concatenated user_prompt | robot_utterance)
# Columns:
# - task_index: int64
# - user_prompt: string
# - robot_utterance: string
# - skill: string (associated subtask)
# - scenario_type: string
# - response_type: string
```
2. **`meta/syn_annotations.jsonl`**: Debug annotations (JSONL format)
```json
{"episode_id": 0, "timestamp": 1.5, "skill_current": "grasp object", "user_prompt": "Can you pick that up?", "robot_utterance": "Sure, I'll grasp it now", ...}
```
3. **`task_index_high_level` feature**: Added to each frame
- Type: `int64`
- Shape: `(1,)`
- Maps each frame to its high-level task
### Dialogue Types Generated
The system generates diverse interaction types:
**Scenario Types:**
- `specific_object`: "Pick up the red block"
- `negative_task`: "Don't touch the blue one"
- `situated_correction`: "Actually, move to the other box instead"
- `implicit_request`: "I need something red for the tower"
- `constraint_based`: "Make sure to handle it gently"
**Response Types:**
- `confirmation`: "OK, I'll pick it up"
- `clarification`: "Just to confirm, you want me to pick up the red block?"
- `acknowledgment`: "Got it, picking up the red block"
- `constraint_acknowledgment`: "Sure, I'll pick it up gently"
### Accessing High-Level Annotations
```python
from lerobot.datasets.lerobot_dataset import LeRobotDataset
import pandas as pd
# Load annotated dataset
dataset = LeRobotDataset(repo_id="your/dataset_with_high_level_tasks")
# Get a frame
frame = dataset[100]
# Get the high-level task
task_idx = frame["task_index_high_level"].item()
# Load tasks metadata
tasks_df = pd.read_parquet(dataset.root / "meta" / "tasks_high_level.parquet")
task_row = tasks_df[tasks_df["task_index"] == task_idx].iloc[0]
print(f"User: {task_row['user_prompt']}")
print(f"Robot: {task_row['robot_utterance']}")
print(f"Skill: {task_row['skill']}")
# Use in a DataLoader
import torch
from torch.utils.data import DataLoader
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
batch = next(iter(dataloader))
print(f"Task indices: {batch['task_index_high_level']}")
print(f"User prompts: {batch['user_prompt'][0]}")
print(f"Robot utterances: {batch['robot_utterance'][0]}")
```
## Complete Pipeline Example
Here's how to run both annotation stages:
```bash
#!/bin/bash
REPO_ID="your-username/your-dataset"
MODEL="Qwen/Qwen2-VL-7B-Instruct"
OUTPUT_DIR="/path/to/output"
# Step 1: Subtask Annotation
python src/lerobot/policies/pi05_full/annotate/subtask_annotate.py \
--repo-id "$REPO_ID" \
--video-key observation.images.base \
--model "$MODEL" \
--batch-size 8 \
--output-dir "${OUTPUT_DIR}/subtasks"
# Step 2: High-Level Annotation (Image Mode)
python src/lerobot/policies/pi05_full/annotate/high_level_annotate.py \
--data-dir "${OUTPUT_DIR}/subtasks" \
--model "$MODEL" \
--image-key observation.images.base \
--sample-interval 1.0 \
--output-dir "${OUTPUT_DIR}/final"
# Or Step 2: High-Level Annotation (Video Mode - Recommended)
python src/lerobot/policies/pi05_full/annotate/high_level_annotate.py \
--data-dir "${OUTPUT_DIR}/subtasks" \
--model "$MODEL" \
--video-mode \
--video-key observation.images.base \
--video-batch-size 4 \
--output-dir "${OUTPUT_DIR}/final"
```
## Performance Tips
### For Faster Processing
1. **Increase batch size**: Use `--batch-size 16` or higher (subtask annotation)
2. **Increase video batch size**: Use `--video-batch-size 8` (high-level annotation in video mode)
3. **Larger sampling interval**: Use `--sample-interval 5.0` for testing (samples every 5 seconds instead of 1)
4. **Use smaller models**: `Qwen/Qwen2-VL-2B-Instruct` is faster than `Qwen2-VL-7B-Instruct`
5. **Process specific episodes**: Use `--episodes 0 1 2 3` to annotate only a subset
### For Better Quality
1. **Use larger models**: `Qwen/Qwen3-VL-30B-A3B-Instruct` or `Qwen/Qwen2-VL-72B-Instruct`
2. **Use video mode**: Provides better temporal context
3. **Smaller sampling intervals**: `--sample-interval 0.5` for dense annotations
4. **Adjust temperature**: Use `--temperature 0.9` for more diverse dialogue
## Memory Requirements
| Model | GPU Memory | Recommended Batch Size |
|-------|------------|------------------------|
| Qwen2-VL-2B | ~8 GB | 16-32 |
| Qwen2-VL-7B | ~16 GB | 8-16 |
| Qwen2-VL-72B | ~80 GB | 1-2 |
| Qwen3-VL-30B | ~40 GB | 4-8 |
## Troubleshooting
### "FFmpeg not found"
```bash
# Install FFmpeg
sudo apt-get install ffmpeg # Ubuntu/Debian
brew install ffmpeg # macOS
```
### "CUDA out of memory"
- Reduce batch size: `--batch-size 1` or `--video-batch-size 1`
- Use smaller model: `Qwen/Qwen2-VL-2B-Instruct`
- Use CPU: `--device cpu` (much slower)
### "No skills.json found"
Run subtask annotation first before high-level annotation.
### "Video key not found"
List available keys:
```python
from lerobot.datasets.lerobot_dataset import LeRobotDataset
dataset = LeRobotDataset(repo_id="your/dataset")
print("Video keys:", dataset.meta.video_keys)
print("Camera keys:", dataset.meta.camera_keys)
```
## Dataset Structure After Annotation
```
your_dataset_with_high_level_tasks/
├── meta/
│ ├── info.json # Original metadata
│ ├── tasks.parquet # Original tasks (preserved)
│ ├── subtasks.parquet # NEW: Subtask names and indices
│ ├── skills.json # NEW: Raw skill annotations with timestamps
│ ├── tasks_high_level.parquet # NEW: High-level tasks with dialogue
│ └── syn_annotations.jsonl # NEW: Debug annotations
├── data/
│ └── chunk-000/
│ ├── observation.images.base.mp4
│ ├── action.safetensors
│ ├── subtask_index.safetensors # NEW: Subtask per frame
│ └── task_index_high_level.safetensors # NEW: High-level task per frame
└── videos/
└── ...
```
## Citation
If you use these annotation tools in your research, please cite:
```bibtex
@article{lerobot2024,
title={LeRobot: State-of-the-art Machine Learning for Real-World Robotics},
author={LeRobot Contributors},
year={2024},
url={https://github.com/huggingface/lerobot}
}
```
## Next Steps
After annotation, you can:
1. Train hierarchical policies using the subtask and high-level annotations
2. Use the synthetic dialogue for instruction-following policy training
3. Analyze skill distributions and dialogue patterns
4. Share your annotated dataset on HuggingFace Hub with `--push-to-hub`
For training examples, see the [training documentation](../training/).
+4 -1
View File
@@ -185,13 +185,16 @@ echo $HF_USER
Use the standard recording command:
```bash
python src/lerobot/scripts/lerobot_record.py \
lerobot-record \
--robot.type=earthrover_mini_plus \
--teleop.type=keyboard_rover \
--dataset.repo_id=your_username/dataset_name \
--dataset.num_episodes=2 \
--dataset.fps=10 \
--dataset.single_task="Navigate around obstacles" \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
# --dataset.vcodec=auto \
--display_data=true
```
+77 -3
View File
@@ -55,7 +55,8 @@ To make your environment loadable from the Hub, your repository must contain at
**`env.py`** (or custom Python file)
- Must expose a `make_env(n_envs: int, use_async_envs: bool)` function
- Must expose a `make_env(n_envs: int, use_async_envs: bool, **kwargs)` function
- The function should accept `**kwargs` to allow users to pass custom configurations
- This function should return one of:
- A `gym.vector.VectorEnv` (most common)
- A single `gym.Env` (will be automatically wrapped)
@@ -99,6 +100,8 @@ Create an `env.py` file with a `make_env` function:
```python
# env.py
import gymnasium as gym
from pathlib import Path
from typing import Any
def make_env(n_envs: int = 1, use_async_envs: bool = False):
"""
@@ -250,6 +253,76 @@ envs_dict = make_env(
)
```
### Custom Configuration via kwargs
Hub environments can accept custom configurations through keyword arguments. This is useful for parameterizing tasks, loading different objects, or overriding default settings:
```python
from pathlib import Path
# Pass a config file path
envs_dict = make_env(
"nvkartik/isaaclab-arena-envs:envs/microwave_g1.py",
n_envs=4,
trust_remote_code=True,
config_path=Path("/path/to/my_config.yaml"),
)
# Pass config overrides as a dictionary
envs_dict = make_env(
"nvkartik/isaaclab-arena-envs:envs/microwave_g1.py",
n_envs=4,
trust_remote_code=True,
config_overrides={
"scene.object": "microwave",
"sim.dt": 0.01,
},
)
# Combine config path with overrides
envs_dict = make_env(
"username/my-env",
n_envs=4,
trust_remote_code=True,
config_path="configs/gr1_pick_place.yaml",
config_overrides={"scene.table_objects": ["apple", "banana", "cup"]},
)
```
Any keyword arguments you pass will be forwarded to the hub environment's `make_env` function. Check the environment's documentation for supported configuration options.
### Using Custom kwargs with lerobot-eval
When evaluating policies using the `lerobot-eval` CLI, you can pass custom kwargs to hub environments using the `--env_kwargs.` prefix:
```bash
lerobot-eval \
--policy.path=user123/example-policy-checkpoint \
--env=user123/example-sim-backend \
--eval.batch_size=1 \
--eval.n_episodes=10 \
--env_kwargs.task_id=demo_task_alpha \
--env_kwargs.agent_profile=arm_v2 \
--env_kwargs.target_item=object_red \
--env_kwargs.run_mode=offscreen \
--env_kwargs.enable_sensors=true \
--env_kwargs.record_output=true \
--env_kwargs.output_horizon=10 \
--env_kwargs.output_stride=15 \
--env_kwargs.state_features=joint_angles \
--env_kwargs.visual_streams=front_camera
```
All `--env_kwargs.*` arguments will be collected into a dictionary and passed as keyword arguments to the hub environment's `make_env` function. This allows you to:
- Pass configuration file paths
- Override default settings
- Specify custom task parameters
- Control simulation options (headless mode, camera settings, etc.)
- Select different embodiments or objects
The hub environment's `make_env` function receives these as regular keyword arguments, so check the environment's documentation for the available options.
## URL Format Reference
The hub URL format supports several patterns:
@@ -266,7 +339,7 @@ The hub URL format supports several patterns:
For benchmarks with multiple tasks (like LIBERO), return a nested dictionary:
```python
def make_env(n_envs: int = 1, use_async_envs: bool = False):
def make_env(n_envs: int = 1, use_async_envs: bool = False, **kwargs):
env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
# Return dict: {suite_name: {task_id: VectorEnv}}
@@ -388,8 +461,9 @@ pip install gymnasium numpy
Your `env.py` must expose a `make_env` function:
```python
def make_env(n_envs: int, use_async_envs: bool):
def make_env(n_envs: int, use_async_envs: bool, **kwargs):
# Your implementation
# kwargs can include config_path, config_overrides, etc.
pass
```
+6 -3
View File
@@ -120,9 +120,12 @@ lerobot-record \
--display_data=true \
--dataset.repo_id=<user>/eval_groot-bimanual \
--dataset.num_episodes=10 \
--dataset.single_task="Grab and handover the red cube to the other arm"
--policy.path=<user>/groot-bimanual # your trained model
--dataset.episode_time_s=30
--dataset.single_task="Grab and handover the red cube to the other arm" \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
# --dataset.vcodec=auto \
--policy.path=<user>/groot-bimanual \ # your trained model
--dataset.episode_time_s=30 \
--dataset.reset_time_s=10
```
+11 -5
View File
@@ -224,12 +224,15 @@ lerobot-record \
--teleop.port=/dev/tty.usbmodem1201 \
--teleop.id=right \
--teleop.side=right \
--dataset.repo_id=nepyope/hand_record_test_with_video_data \
--dataset.repo_id=<USER>/hand_record_test_with_video_data \
--dataset.single_task="Hand recording test with video data" \
--dataset.num_episodes=1 \
--dataset.episode_time_s=5 \
--dataset.push_to_hub=true \
--dataset.private=true \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
# --dataset.vcodec=auto \
--display_data=true
```
@@ -241,7 +244,7 @@ lerobot-replay \
--robot.port=/dev/tty.usbmodem58760432281 \
--robot.id=right \
--robot.side=right \
--dataset.repo_id=nepyope/hand_record_test_with_camera \
--dataset.repo_id=<USER>/hand_record_test_with_camera \
--dataset.episode=0
```
@@ -249,13 +252,13 @@ lerobot-replay \
```bash
lerobot-train \
--dataset.repo_id=nepyope/hand_record_test_with_video_data \
--dataset.repo_id=<USER>/hand_record_test_with_video_data \
--policy.type=act \
--output_dir=outputs/train/hopejr_hand \
--job_name=hopejr \
--policy.device=mps \
--wandb.enable=true \
--policy.repo_id=nepyope/hand_test_policy
--policy.repo_id=<USER>/hand_test_policy
```
### Evaluate
@@ -270,8 +273,11 @@ lerobot-record \
--robot.side=right \
--robot.cameras='{"main": {"type": "opencv", "index_or_path": 0, "width": 640, "height": 480, "fps": 30}}' \
--display_data=false \
--dataset.repo_id=nepyope/eval_hopejr \
--dataset.repo_id=<USER>/eval_hopejr \
--dataset.single_task="Evaluate hopejr hand policy" \
--dataset.num_episodes=10 \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
# --dataset.vcodec=auto \
--policy.path=outputs/train/hopejr_hand/checkpoints/last/pretrained_model
```
+8 -2
View File
@@ -165,7 +165,7 @@ huggingface-cli login --token ${HUGGINGFACE_TOKEN} --add-to-git-credential
Then store your Hugging Face repository name in a variable:
```bash
HF_USER=$(hf auth whoami | head -n 1)
HF_USER=$(hf auth whoami | awk -F': *' 'NR==1 {print $2}')
echo $HF_USER
```
@@ -185,7 +185,10 @@ lerobot-record \
--display_data=true \
--dataset.repo_id=${HF_USER}/record-test \
--dataset.num_episodes=5 \
--dataset.single_task="Grab the black cube"
--dataset.single_task="Grab the black cube" \
--dataset.streaming_encoding=true \
# --dataset.vcodec=auto \
--dataset.encoder_threads=2
```
</hfoption>
<hfoption id="API example">
@@ -515,6 +518,9 @@ lerobot-record \
--display_data=false \
--dataset.repo_id=${HF_USER}/eval_so100 \
--dataset.single_task="Put lego brick into the transparent box" \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
# --dataset.vcodec=auto \
# <- Teleop optional if you want to teleoperate in between episodes \
# --teleop.type=so100_leader \
# --teleop.port=/dev/ttyACM0 \
+12 -3
View File
@@ -1,13 +1,15 @@
# Installation
## Install [`miniforge`](https://conda-forge.org/download/)
This guide uses conda (via miniforge) to manage environments. If you prefer another environment manager (e.g. `uv`, `venv`), ensure you have Python >=3.10 and ffmpeg installed with the `libsvtav1` encoder, then skip ahead to [Install LeRobot](#step-3-install-lerobot-).
## Step 1: Install [`miniforge`](https://conda-forge.org/download/)
```bash
wget "https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-$(uname)-$(uname -m).sh"
bash Miniforge3-$(uname)-$(uname -m).sh
```
## Environment Setup
## Step 2: Environment Setup
Create a virtual environment with Python 3.10, using conda:
@@ -38,7 +40,14 @@ conda install ffmpeg -c conda-forge
>
> - _[On Linux only]_ If you want to bring your own ffmpeg: Install [ffmpeg build dependencies](https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu#GettheDependencies) and [compile ffmpeg from source with libsvtav1](https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu#libsvtav1), and make sure you use the corresponding ffmpeg binary to your install with `which ffmpeg`.
## Install LeRobot 🤗
> [!NOTE]
> When installing LeRobot inside WSL (Windows Subsystem for Linux), make sure to install `evdev` with the following command:
>
> ```bash
> conda install evdev -c conda-forge
> ```
## Step 3: Install LeRobot 🤗
### From Source
+4 -1
View File
@@ -41,7 +41,10 @@ lerobot-record \
--display_data=true \
--dataset.repo_id=${HF_USER}/record-test \
--dataset.num_episodes=5 \
--dataset.single_task="Grab the black cube"
--dataset.single_task="Grab the black cube" \
--dataset.streaming_encoding=true \
# --dataset.vcodec=auto \
--dataset.encoder_threads=2
```
See the [recording guide](./il_robots#record-a-dataset) for more details.
+9 -5
View File
@@ -66,12 +66,13 @@ Run on of the examples scripts to teleoperate, record a dataset, replay a datase
All scripts assume you configured your robot (e.g., SO-100 follower) and set the correct serial port.
Additionally you need to **copy the urdf of the robot to the examples folder**. For the examples in this tutorial (Using SO100/SO101) it is highly recommended to use the urdf in the [SO-ARM100 repo](https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf)
Additionally you need to **copy the URDF of the robot into the examples folder**. For the examples in this tutorial (using SO100/SO101), copy the `SO101` folder from the [SO-ARM100 repo](https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101) into the `examples/phone_to_so100/` directory, so that the URDF file path becomes `examples/phone_to_so100/SO101/so101_new_calib.urdf`.
- Run this example to teleoperate:
```bash
python examples/phone_to_so100/teleoperate.py
cd examples/phone_to_so100
python teleoperate.py
```
After running the example:
@@ -84,19 +85,22 @@ Additionally you can customize mapping or safety limits by editing the processor
- Run this example to record a dataset, which saves absolute end effector observations and actions:
```bash
python examples/phone_to_so100/record.py
cd examples/phone_to_so100
python record.py
```
- Run this example to replay recorded episodes:
```bash
python examples/phone_to_so100/replay.py
cd examples/phone_to_so100
python replay.py
```
- Run this example to evaluate a pretrained policy:
```bash
python examples/phone_to_so100/evaluate.py
cd examples/phone_to_so100
python evaluate.py
```
### Important pipeline steps and options
+1 -1
View File
@@ -60,7 +60,7 @@ policy.type=pi0
For training π₀, you can use the standard LeRobot training script with the appropriate configuration:
```bash
python src/lerobot/scripts/lerobot_train.py \
lerobot-train \
--dataset.repo_id=your_dataset \
--policy.type=pi0 \
--output_dir=./outputs/pi0_training \
+1 -1
View File
@@ -56,7 +56,7 @@ policy.type=pi05
Here's a complete training command for finetuning the base π₀.₅ model on your own dataset:
```bash
python src/lerobot/scripts/lerobot_train.py\
lerobot-train \
--dataset.repo_id=your_dataset \
--policy.type=pi05 \
--output_dir=./outputs/pi05_training \
+6
View File
@@ -159,6 +159,9 @@ lerobot-record \
--dataset.fps=15 \
--dataset.push_to_hub=true \
--dataset.private=true \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
# --dataset.vcodec=auto \
--display_data=true
```
@@ -198,6 +201,9 @@ lerobot-record \
--dataset.fps=15 \
--dataset.push_to_hub=true \
--dataset.private=true \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
# --dataset.vcodec=auto \
--display_data=true
```
+4 -4
View File
@@ -269,7 +269,7 @@ This generates visualizations showing video frames with subtask boundaries overl
Train with **no annotations** - uses linear progress from 0 to 1:
```bash
python src/lerobot/scripts/lerobot_train.py \
lerobot-train \
--dataset.repo_id=your-username/your-dataset \
--policy.type=sarm \
--policy.annotation_mode=single_stage \
@@ -288,7 +288,7 @@ python src/lerobot/scripts/lerobot_train.py \
Train with **dense annotations only** (sparse auto-generated):
```bash
python src/lerobot/scripts/lerobot_train.py \
lerobot-train \
--dataset.repo_id=your-username/your-dataset \
--policy.type=sarm \
--policy.annotation_mode=dense_only \
@@ -307,7 +307,7 @@ python src/lerobot/scripts/lerobot_train.py \
Train with **both sparse and dense annotations**:
```bash
python src/lerobot/scripts/lerobot_train.py \
lerobot-train \
--dataset.repo_id=your-username/your-dataset \
--policy.type=sarm \
--policy.annotation_mode=dual \
@@ -468,7 +468,7 @@ This script:
Once you have the progress file, train your policy with RA-BC weighting. The progress file is auto-detected from the dataset path (`sarm_progress.parquet`). Currently PI0, PI0.5 and SmolVLA are supported with RA-BC:
```bash
python src/lerobot/scripts/lerobot_train.py \
lerobot-train \
--dataset.repo_id=your-username/your-dataset \
--policy.type=pi0 \
--use_rabc=true \
+3
View File
@@ -106,6 +106,9 @@ lerobot-record \
--dataset.repo_id=${HF_USER}/eval_DATASET_NAME_test \ # <- This will be the dataset name on HF Hub
--dataset.episode_time_s=50 \
--dataset.num_episodes=10 \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
# --dataset.vcodec=auto \
# <- Teleop optional if you want to teleoperate in between episodes \
# --teleop.type=so100_leader \
# --teleop.port=/dev/ttyACM0 \
+155
View File
@@ -0,0 +1,155 @@
# Streaming Video Encoding Guide
## 1. Overview
Streaming video encoding eliminates the traditional PNG round-trip during video dataset recording. Instead of:
1. Capture frame -> write PNG to disk -> (at episode end) read PNG's -> encode to MP4 -> delete PNG's
Frames can be encoded in real-time during capture:
1. Capture frame -> queue to encoder thread -> encode to MP4 directly
This makes `save_episode()` near-instant (the video is already encoded by the time the episode ends) and removes the blocking wait that previously occurred between episodes, especially with multiple cameras in long episodes.
## 2. Tuning Parameters
| Parameter | CLI Flag | Type | Default | Description |
| ----------------------- | --------------------------------- | ------------- | ------------- | ----------------------------------------------------------------- |
| `streaming_encoding` | `--dataset.streaming_encoding` | `bool` | `True` | Enable real-time encoding during capture |
| `vcodec` | `--dataset.vcodec` | `str` | `"libsvtav1"` | Video codec. `"auto"` detects best HW encoder |
| `encoder_threads` | `--dataset.encoder_threads` | `int \| None` | `None` (auto) | Threads per encoder instance. `None` will leave the vcoded decide |
| `encoder_queue_maxsize` | `--dataset.encoder_queue_maxsize` | `int` | `60` | Max buffered frames per camera (~2s at 30fps). Consumes RAM |
## 3. Performance Considerations
Streaming encoding means the CPU is encoding video **during** the capture loop, not after. This creates a CPU budget that must be shared between:
- **Control loop** (reading cameras, control the robot, writing non-video data)
- **Encoder threads** (one pool per camera)
- **Rerun visualization** (if enabled)
- **OS and other processes**
### Resolution & Number of Cameras Impact
| Setup | Throughput (px/sec) | CPU Encoding Load | Notes |
| ------------------------- | ------------------- | ----------------- | ------------------------------ |
| 2camsx 640x480x3 @30fps | 55M | Low | Works on most systems |
| 2camsx 1280x720x3 @30fps | 165M | Moderate | Comfortable on modern systems |
| 2camsx 1920x1080x3 @30fps | 373M | High | Requires powerful high-end CPU |
### `encoder_threads` Tuning
This parameter controls how many threads each encoder instance uses internally:
- **Higher values** (e.g., 4-5): Faster encoding, but uses more CPU cores per camera. Good for high-end systems with many cores.
- **Lower values** (e.g., 1-2): Less CPU per camera, freeing cores for capture and visualization. Good for low-res images and capable CPUs.
- **`None` (default)**: Lets the codec decide. Information available in the codec logs.
### Backpressure and Frame Dropping
Each camera has a bounded queue (`encoder_queue_maxsize`, default 60 frames). When the encoder can't keep up:
1. The queue fills up (consuming RAM)
2. New frames are **dropped** (not blocked) — the capture loop continues uninterrupted
3. A warning is logged: `"Encoder queue full for {camera}, dropped N frame(s)"`
4. At episode end, total dropped frames per camera are reported
### Symptoms of Encoder Falling Behind
- **System feels laggy and freezes**: all CPUs are at 100%
- **Dropped frame warnings** in the log or lower frames/FPS than expected in the recorded dataset
- **Choppy robot movement**: If CPU is severely overloaded, even the capture loop may be affected
- **Accumulated rerun lag**: Visualization falls behind real-time
## 4. Hardware-Accelerated Encoding
### When to Use
Use HW encoding when:
- CPU is the bottleneck (dropped frames, choppy robot, rerun lag)
- You have compatible hardware (GPU or dedicated encoder)
- You're recording at high throughput (high resolution or with many cameras)
### Choosing a Codec
| Codec | CPU Usage | File Size | Quality | Notes |
| --------------------- | --------- | -------------- | ------- | ---------------------------------------------------------------- |
| `libsvtav1` (default) | High | Smallest | Best | Default. Best compression but most CPU-intensive |
| `h264` | Medium | ~30-50% larger | Good | Software H.264. Lower CPU |
| HW encoders | Very Low | Largest | Good | Offloads to dedicated hardware. Best for CPU-constrained systems |
### Available HW Encoders
| Encoder | Platform | Hardware | CLI Value |
| ------------------- | ------------- | ------------------------------------------------------------------------------------------------ | ------------------------------------ |
| `h264_videotoolbox` | macOS | Apple Silicon / Intel | `--dataset.vcodec=h264_videotoolbox` |
| `hevc_videotoolbox` | macOS | Apple Silicon / Intel | `--dataset.vcodec=hevc_videotoolbox` |
| `h264_nvenc` | Linux/Windows | NVIDIA GPU | `--dataset.vcodec=h264_nvenc` |
| `hevc_nvenc` | Linux/Windows | NVIDIA GPU | `--dataset.vcodec=hevc_nvenc` |
| `h264_vaapi` | Linux | Intel/AMD GPU | `--dataset.vcodec=h264_vaapi` |
| `h264_qsv` | Linux/Windows | Intel Quick Sync | `--dataset.vcodec=h264_qsv` |
| `auto` | Any | Probes the system for available HW encoders. Falls back to `libsvtav1` if no HW encoder is found | `--dataset.vcodec=auto` |
> [!NOTE]
> In order to use the HW accelerated encoders you might need to upgrade your GPU drivers.
> [!NOTE]
> `libsvtav1` is the default because it provides the best training performance; other vcodecs can reduce CPU usage and be faster, but they typically produce larger files and may affect training time.
## 5. Troubleshooting
| Symptom | Likely Cause | Fix |
| ------------------------------------------------------------------ | -------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| System freezes or choppy robot movement or Rerun visualization lag | CPU starved (100% load usage) | Close other apps, reduce encoding throughput, lower `encoder_threads`, use `h264`, use `display_data=False`. If the CPU continues to be at 100% then it might be insufficient for your setup, consider `--dataset.streaming_encoding=false` or HW encoding (`--dataset.vcodec=auto`) |
| "Encoder queue full" warnings or dropped frames in dataset | Encoder can't keep up (Queue overflow) | If CPU is not at 100%: Increase `encoder_threads`, increase `encoder_queue_maxsize` or use HW encoding (`--dataset.vcodec=auto`). |
| High RAM usage | Queue filling faster than encoding | `encoder_threads` too low or CPU insufficient. Reduce `encoder_queue_maxsize` or use HW encoding |
| Large video files | Using HW encoder or H.264 | Expected trade-off. Switch to `libsvtav1` if CPU allows |
| `save_episode()` still slow | `streaming_encoding` is `False` | Set `--dataset.streaming_encoding=true` |
| Encoder thread crash | Codec not available or invalid settings | Check `vcodec` is installed, try `--dataset.vcodec=auto` |
| Recorded dataset is missing frames | CPU/GPU starvation or occasional load spikes | If ~5% of frames are missing, your system is likely overloaded — follow the recommendations above. If fewer frames are missing (~2%), they are probably due to occasional transient load spikes (often at startup) and can be considered expected. |
## 6. Recommended Configurations
These estimates are conservative; we recommend testing them on your setup—start with a low load and increase it gradually.
### High-End Systems: modern 12+ cores (24+ threads)
A throughput between ~250-500M px/sec should be comfortable in CPU. For even better results try HW encoding if available.
```bash
# 3camsx 1280x720x3 @30fps: Defaults work well. Optionally increase encoder parallelism.
# 2camsx 1920x1080x3 @30fps: Defaults work well. Optionally increase encoder parallelism.
lerobot-record --dataset.encoder_threads=5 ...
# 3camsx 1920x1080x3 @30fps: Might require some tuning.
```
### Mid-Range Systems: modern 8+ cores (16+ threads) or Apple Silicon
A throughput between ~80-300M px/sec should be possible in CPU.
```bash
# 3camsx 640x480x3 @30fps: Defaults work well. Optionally decrease encoder parallelism.
# 2camsx 1280x720x3 @30fps: Defaults work well. Optionally decrease encoder parallelism.
lerobot-record --dataset.encoder_threads=2 ...
# 2camsx 1920x1080x3 @30fps: Might require some tuning.
```
### Low-Resource Systems: modern 4+ cores (8+ threads) or Raspberry Pi 5
On very constrained systems, streaming encoding may compete too heavily with the capture loop. Disabling it falls back to the PNG-based approach where encoding happens between episodes (blocking, but doesn't interfere with capture). Alternatively, record at a lower throughput to reduce both capture and encoding load. Consider also changing codec to `h264` and using batch encoding.
```bash
# 2camsx 640x480x3 @30fps: Requires some tuning.
# Use H.264, disable streaming, consider batching encoding
lerobot-record --dataset.vcodec=h264 --dataset.streaming_encoding=false ...
```
## 7. Closing note
Performance ultimately depends on your exact setup — frames-per-second, resolution, CPU cores and load, available memory, episode length, and the encoder you choose. Always test with your target workload, be mindful about your CPU & system capabilities and tune `encoder_threads`, `encoder_queue_maxsize`, and
`vcodec` reasonably. That said, a common practical configuration (for many applications) is three cameras at 640×480x3 @30fps; this usually runs fine with the default streaming video encoding settings in modern systems. Always verify your recorded dataset is healthy by comparing the video duration to the CLI episode duration and confirming the row count equals FPS × CLI duration.
+10 -4
View File
@@ -216,7 +216,7 @@ lerobot-teleoperate \
### Record Dataset in Simulation
```bash
python -m lerobot.scripts.lerobot_record \
lerobot-record \
--robot.type=unitree_g1 \
--robot.is_simulation=true \
--robot.cameras='{"global_view": {"type": "zmq", "server_address": "localhost", "port": 5555, "camera_name": "head_camera", "width": 640, "height": 480, "fps": 30}}' \
@@ -229,7 +229,10 @@ python -m lerobot.scripts.lerobot_record \
--dataset.num_episodes=2 \
--dataset.episode_time_s=5 \
--dataset.reset_time_s=5 \
--dataset.push_to_hub=true
--dataset.push_to_hub=true \
--dataset.streaming_encoding=true \
# --dataset.vcodec=auto \
--dataset.encoder_threads=2
```
Example simulation dataset: [nepyope/teleop_test_sim](https://huggingface.co/datasets/nepyope/teleop_test_sim)
@@ -266,7 +269,7 @@ lerobot-teleoperate \
### Record Dataset on Real Robot
```bash
python -m lerobot.scripts.lerobot_record \
lerobot-record \
--robot.type=unitree_g1 \
--robot.is_simulation=false \
--robot.cameras='{"global_view": {"type": "zmq", "server_address": "172.18.129.215", "port": 5555, "camera_name": "head_camera", "width": 640, "height": 480, "fps": 30}}' \
@@ -279,7 +282,10 @@ python -m lerobot.scripts.lerobot_record \
--dataset.num_episodes=2 \
--dataset.episode_time_s=5 \
--dataset.reset_time_s=5 \
--dataset.push_to_hub=true
--dataset.push_to_hub=true \
--dataset.streaming_encoding=true \
# --dataset.vcodec=auto \
--dataset.encoder_threads=2
```
**Note**: Update `server_address` to match your robot's camera server IP.
+25
View File
@@ -12,6 +12,7 @@ LeRobot provides several utilities for manipulating datasets:
4. **Add Features** - Add new features to a dataset
5. **Remove Features** - Remove features from a dataset
6. **Convert to Video** - Convert image-based datasets to video format for efficient storage
7. **Show the Info of Datasets** - Show the summary of datasets information such as number of episode etc.
The core implementation is in `lerobot.datasets.dataset_tools`.
An example script detailing how to use the tools API is available in `examples/dataset/use_dataset_tools.py`.
@@ -156,6 +157,30 @@ lerobot-edit-dataset \
**Note:** The resulting dataset will be a proper LeRobotDataset with all cameras encoded as videos in the `videos/` directory, with parquet files containing only metadata (no raw image data). All episodes, stats, and tasks are preserved.
### Show the information of datasets
Show the information of datasets such as number of episode, number of frame, File size and so on.
No change will be made to the dataset
```bash
# Show dataset information without feature details
lerobot-edit-dataset \
--repo_id lerobot/pusht_image \
--operation.type info \
# Show dataset information with feature details
lerobot-edit-dataset \
--repo_id lerobot/pusht_image \
--operation.type info \
--operation.show_features true
```
**Parameters:**
- `parameters`: The flag to control show or no show dataset information with feature details.(default=false)
### Push to Hub
Add the `--push_to_hub true` flag to any command to automatically upload the resulting dataset to the Hugging Face Hub:
+1 -1
View File
@@ -45,7 +45,7 @@ policy.type=wall_x
For training WallX, you can use the standard LeRobot training script with the appropriate configuration:
```bash
python src/lerobot/scripts/lerobot_train.py \
lerobot-train \
--dataset.repo_id=your_dataset \
--policy.type=wall_x \
--output_dir=./outputs/wallx_training \
+1 -1
View File
@@ -154,7 +154,7 @@ lerobot-train \
```bash
lerobot-train \
--dataset.repo_id=pepijn223/bimanual-so100-handover-cube \
--dataset.repo_id=<USER>/bimanual-so100-handover-cube \
--output_dir=./outputs/xvla_bimanual \
--job_name=xvla_so101_training \
--policy.path="lerobot/xvla-base" \
+1 -1
View File
@@ -22,7 +22,7 @@ lerobot-replay \
--robot.type=so100_follower \
--robot.port=/dev/tty.usbmodem58760431541 \
--robot.id=black \
--dataset.repo_id=aliberts/record-test \
--dataset.repo_id=<USER>/record-test \
--dataset.episode=2
```
"""
+10 -10
View File
@@ -27,8 +27,8 @@ measuring consistency and ground truth alignment.
Usage:
# Basic usage with smolvla policy
uv run python examples/rtc/eval_dataset.py \
--policy.path=helper2424/smolvla_check_rtc_last3 \
--dataset.repo_id=helper2424/check_rtc \
--policy.path=<USER>/smolvla_check_rtc_last3 \
--dataset.repo_id=<USER>/check_rtc \
--rtc.execution_horizon=8 \
--device=mps \
--rtc.max_guidance_weight=10.0 \
@@ -58,16 +58,16 @@ Usage:
--device=cuda
uv run python examples/rtc/eval_dataset.py \
--policy.path=lipsop/reuben_pi0 \
--dataset.repo_id=ReubenLim/so101_cube_in_cup \
--policy.path=<USER>/reuben_pi0 \
--dataset.repo_id=<USER>/so101_cube_in_cup \
--rtc.execution_horizon=8 \
--device=cuda
# With torch.compile for faster inference (PyTorch 2.0+)
# Note: CUDA graphs disabled by default due to in-place ops in denoising loop
uv run python examples/rtc/eval_dataset.py \
--policy.path=helper2424/smolvla_check_rtc_last3 \
--dataset.repo_id=helper2424/check_rtc \
--policy.path=<USER>/smolvla_check_rtc_last3 \
--dataset.repo_id=<USER>/check_rtc \
--rtc.execution_horizon=8 \
--device=mps \
--use_torch_compile=true \
@@ -75,8 +75,8 @@ Usage:
# With torch.compile on CUDA (CUDA graphs disabled by default)
uv run python examples/rtc/eval_dataset.py \
--policy.path=helper2424/smolvla_check_rtc_last3 \
--dataset.repo_id=helper2424/check_rtc \
--policy.path=<USER>/smolvla_check_rtc_last3 \
--dataset.repo_id=<USER>/check_rtc \
--rtc.execution_horizon=8 \
--device=cuda \
--use_torch_compile=true \
@@ -84,8 +84,8 @@ Usage:
# Enable CUDA graphs (advanced - may cause tensor aliasing errors)
uv run python examples/rtc/eval_dataset.py \
--policy.path=helper2424/smolvla_check_rtc_last3 \
--dataset.repo_id=helper2424/check_rtc \
--policy.path=<USER>/smolvla_check_rtc_last3 \
--dataset.repo_id=<USER>/check_rtc \
--use_torch_compile=true \
--torch_compile_backend=inductor \
--torch_compile_mode=max-autotune \
+3 -3
View File
@@ -28,7 +28,7 @@ For simulation environments, see eval_with_simulation.py
Usage:
# Run RTC with Real robot with RTC
uv run examples/rtc/eval_with_real_robot.py \
--policy.path=helper2424/smolvla_check_rtc_last3 \
--policy.path=<USER>/smolvla_check_rtc_last3 \
--policy.device=mps \
--rtc.enabled=true \
--rtc.execution_horizon=20 \
@@ -41,7 +41,7 @@ Usage:
# Run RTC with Real robot without RTC
uv run examples/rtc/eval_with_real_robot.py \
--policy.path=helper2424/smolvla_check_rtc_last3 \
--policy.path=<USER>/smolvla_check_rtc_last3 \
--policy.device=mps \
--rtc.enabled=false \
--robot.type=so100_follower \
@@ -53,7 +53,7 @@ Usage:
# Run RTC with Real robot with pi0.5 policy
uv run examples/rtc/eval_with_real_robot.py \
--policy.path=helper2424/pi05_check_rtc \
--policy.path=<USER>/pi05_check_rtc \
--policy.device=mps \
--rtc.enabled=true \
--rtc.execution_horizon=20 \
+10 -8
View File
@@ -59,7 +59,7 @@ keywords = ["lerobot", "huggingface", "robotics", "machine learning", "artifici
dependencies = [
# Hugging Face dependencies
"datasets>=4.0.0,<4.2.0",
"datasets>=4.0.0,<5.0.0",
"diffusers>=0.27.2,<0.36.0",
"huggingface-hub[hf-transfer,cli]>=0.34.2,<0.36.0",
"accelerate>=1.10.0,<2.0.0",
@@ -76,9 +76,9 @@ dependencies = [
"pyserial>=3.5,<4.0",
"wandb>=0.24.0,<0.25.0",
"torch>=2.2.1,<2.8.0", # TODO: Bumb dependency
"torchcodec>=0.2.1,<0.6.0; sys_platform != 'win32' and (sys_platform != 'linux' or (platform_machine != 'aarch64' and platform_machine != 'arm64' and platform_machine != 'armv7l')) and (sys_platform != 'darwin' or platform_machine != 'x86_64')", # TODO: Bumb dependency
"torchvision>=0.21.0,<0.23.0", # TODO: Bumb dependency
"torch>=2.2.1,<2.11.0", # TODO: Bump dependency
"torchcodec>=0.2.1,<0.11.0; sys_platform != 'win32' and (sys_platform != 'linux' or (platform_machine != 'aarch64' and platform_machine != 'arm64' and platform_machine != 'armv7l')) and (sys_platform != 'darwin' or platform_machine != 'x86_64')", # TODO: Bump dependency
"torchvision>=0.21.0,<0.26.0", # TODO: Bump dependency
"draccus==0.10.0", # TODO: Remove ==
"gymnasium>=1.1.1,<2.0.0",
@@ -98,11 +98,13 @@ pygame-dep = ["pygame>=2.5.1,<2.7.0"]
placo-dep = ["placo>=0.9.6,<0.10.0"]
transformers-dep = ["transformers>=4.57.1,<5.0.0"]
grpcio-dep = ["grpcio==1.73.1", "protobuf>=6.31.1,<6.32.0"]
can-dep = ["python-can>=4.2.0,<5.0.0"]
# Motors
feetech = ["feetech-servo-sdk>=1.0.0,<2.0.0"]
dynamixel = ["dynamixel-sdk>=3.7.31,<3.9.0"]
damiao = ["python-can>=4.2.0,<5.0.0"]
damiao = ["lerobot[can-dep]"]
robstride = ["lerobot[can-dep]"]
# Robots
openarms = ["lerobot[damiao]"]
@@ -360,9 +362,9 @@ ignore_errors = false
module = "lerobot.cameras.*"
ignore_errors = false
# [[tool.mypy.overrides]]
# module = "lerobot.motors.*"
# ignore_errors = false
[[tool.mypy.overrides]]
module = "lerobot.motors.*"
ignore_errors = false
# [[tool.mypy.overrides]]
# module = "lerobot.robots.*"
+1 -1
View File
@@ -13,5 +13,5 @@
# limitations under the License.
from .camera import Camera
from .configs import CameraConfig, ColorMode, Cv2Rotation
from .configs import CameraConfig, ColorMode, Cv2Backends, Cv2Rotation
from .utils import make_cameras_from_configs
+1 -1
View File
@@ -150,7 +150,7 @@ class Camera(abc.ABC):
"""
pass
def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]:
def read_latest(self, max_age_ms: int = 500) -> NDArray[Any]:
"""Return the most recent frame captured immediately (Peeking).
This method is non-blocking and returns whatever is currently in the
+23
View File
@@ -25,6 +25,10 @@ class ColorMode(str, Enum):
RGB = "rgb"
BGR = "bgr"
@classmethod
def _missing_(cls, value: object) -> None:
raise ValueError(f"`color_mode` is expected to be in {list(cls)}, but {value} is provided.")
class Cv2Rotation(int, Enum):
NO_ROTATION = 0
@@ -32,6 +36,25 @@ class Cv2Rotation(int, Enum):
ROTATE_180 = 180
ROTATE_270 = -90
@classmethod
def _missing_(cls, value: object) -> None:
raise ValueError(f"`rotation` is expected to be in {list(cls)}, but {value} is provided.")
# Subset from https://docs.opencv.org/3.4/d4/d15/group__videoio__flags__base.html
class Cv2Backends(int, Enum):
ANY = 0
V4L2 = 200
DSHOW = 700
PVAPI = 800
ANDROID = 1000
AVFOUNDATION = 1200
MSMF = 1400
@classmethod
def _missing_(cls, value: object) -> None:
raise ValueError(f"`backend` is expected to be in {list(cls)}, but {value} is provided.")
@dataclass(kw_only=True)
class CameraConfig(draccus.ChoiceRegistry, abc.ABC): # type: ignore # TODO: add type stubs for draccus
+10 -15
View File
@@ -32,10 +32,11 @@ if platform.system() == "Windows" and "OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"
os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0"
import cv2 # type: ignore # TODO: add type stubs for OpenCV
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from lerobot.utils.errors import DeviceNotConnectedError
from ..camera import Camera
from ..utils import get_cv2_backend, get_cv2_rotation
from ..utils import get_cv2_rotation
from .configuration_opencv import ColorMode, OpenCVCameraConfig
# NOTE(Steven): The maximum opencv device index depends on your operating system. For instance,
@@ -117,7 +118,7 @@ class OpenCVCamera(Camera):
self.new_frame_event: Event = Event()
self.rotation: int | None = get_cv2_rotation(config.rotation)
self.backend: int = get_cv2_backend()
self.backend: int = config.backend
if self.height and self.width:
self.capture_width, self.capture_height = self.width, self.height
@@ -132,6 +133,7 @@ class OpenCVCamera(Camera):
"""Checks if the camera is currently connected and opened."""
return isinstance(self.videocapture, cv2.VideoCapture) and self.videocapture.isOpened()
@check_if_already_connected
def connect(self, warmup: bool = True) -> None:
"""
Connects to the OpenCV camera specified in the configuration.
@@ -148,8 +150,6 @@ class OpenCVCamera(Camera):
ConnectionError: If the specified camera index/path is not found or fails to open.
RuntimeError: If the camera opens but fails to apply requested settings.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} is already connected.")
# Use 1 thread for OpenCV operations to avoid potential conflicts or
# blocking in multi-threaded applications, especially during data collection.
@@ -178,6 +178,7 @@ class OpenCVCamera(Camera):
logger.info(f"{self} connected.")
@check_if_not_connected
def _configure_capture_settings(self) -> None:
"""
Applies the specified FOURCC, FPS, width, and height settings to the connected camera.
@@ -197,8 +198,6 @@ class OpenCVCamera(Camera):
to the requested value.
DeviceNotConnectedError: If the camera is not connected.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"Cannot configure settings for {self} as it is not connected.")
# Set FOURCC first (if specified) as it can affect available FPS/resolution options
if self.config.fourcc is not None:
@@ -348,6 +347,7 @@ class OpenCVCamera(Camera):
return frame
@check_if_not_connected
def read(self, color_mode: ColorMode | None = None) -> NDArray[Any]:
"""
Reads a single frame synchronously from the camera.
@@ -374,9 +374,6 @@ class OpenCVCamera(Camera):
f"{self} read() color_mode parameter is deprecated and will be removed in future versions."
)
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
@@ -490,6 +487,7 @@ class OpenCVCamera(Camera):
self.latest_timestamp = None
self.new_frame_event.clear()
@check_if_not_connected
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
"""
Reads the latest available frame asynchronously.
@@ -512,8 +510,6 @@ class OpenCVCamera(Camera):
TimeoutError: If no frame becomes available within the specified timeout.
RuntimeError: If an unexpected error occurs.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
@@ -533,7 +529,8 @@ class OpenCVCamera(Camera):
return frame
def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]:
@check_if_not_connected
def read_latest(self, max_age_ms: int = 500) -> NDArray[Any]:
"""Return the most recent frame captured immediately (Peeking).
This method is non-blocking and returns whatever is currently in the
@@ -548,8 +545,6 @@ class OpenCVCamera(Camera):
DeviceNotConnectedError: If the camera is not connected.
RuntimeError: If the camera is connected but has not captured any frames yet.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
@@ -15,9 +15,9 @@
from dataclasses import dataclass
from pathlib import Path
from ..configs import CameraConfig, ColorMode, Cv2Rotation
from ..configs import CameraConfig, ColorMode, Cv2Backends, Cv2Rotation
__all__ = ["OpenCVCameraConfig", "ColorMode", "Cv2Rotation"]
__all__ = ["OpenCVCameraConfig", "ColorMode", "Cv2Rotation", "Cv2Backends"]
@CameraConfig.register_subclass("opencv")
@@ -50,6 +50,7 @@ class OpenCVCameraConfig(CameraConfig):
rotation: Image rotation setting (0°, 90°, 180°, or 270°). Defaults to no rotation.
warmup_s: Time reading frames before returning from connect (in seconds)
fourcc: FOURCC code for video format (e.g., "MJPG", "YUYV", "I420"). Defaults to None (auto-detect).
backend: OpenCV backend identifier (https://docs.opencv.org/3.4/d4/d15/group__videoio__flags__base.html). Defaults to ANY.
Note:
- Only 3-channel color output (RGB/BGR) is currently supported.
@@ -62,22 +63,12 @@ class OpenCVCameraConfig(CameraConfig):
rotation: Cv2Rotation = Cv2Rotation.NO_ROTATION
warmup_s: int = 1
fourcc: str | None = None
backend: Cv2Backends = Cv2Backends.ANY
def __post_init__(self) -> None:
if self.color_mode not in (ColorMode.RGB, ColorMode.BGR):
raise ValueError(
f"`color_mode` is expected to be {ColorMode.RGB.value} or {ColorMode.BGR.value}, but {self.color_mode} is provided."
)
if self.rotation not in (
Cv2Rotation.NO_ROTATION,
Cv2Rotation.ROTATE_90,
Cv2Rotation.ROTATE_180,
Cv2Rotation.ROTATE_270,
):
raise ValueError(
f"`rotation` is expected to be in {(Cv2Rotation.NO_ROTATION, Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_180, Cv2Rotation.ROTATE_270)}, but {self.rotation} is provided."
)
self.color_mode = ColorMode(self.color_mode)
self.rotation = Cv2Rotation(self.rotation)
self.backend = Cv2Backends(self.backend)
if self.fourcc is not None and (not isinstance(self.fourcc, str) or len(self.fourcc) != 4):
raise ValueError(
@@ -74,7 +74,4 @@ class Reachy2CameraConfig(CameraConfig):
f"`image_type` is expected to be 'left' or 'right' for teleop camera, and 'rgb' or 'depth' for depth camera, but {self.image_type} is provided."
)
if self.color_mode not in ["rgb", "bgr"]:
raise ValueError(
f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided."
)
self.color_mode = ColorMode(self.color_mode)
@@ -32,6 +32,7 @@ if platform.system() == "Windows" and "OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"
import cv2 # type: ignore # TODO: add type stubs for OpenCV
import numpy as np # type: ignore # TODO: add type stubs for numpy
from lerobot.utils.decorators import check_if_not_connected
from lerobot.utils.import_utils import _reachy2_sdk_available
if TYPE_CHECKING or _reachy2_sdk_available:
@@ -123,6 +124,7 @@ class Reachy2Camera(Camera):
"""
raise NotImplementedError("Camera detection is not implemented for Reachy2 cameras.")
@check_if_not_connected
def read(self, color_mode: ColorMode | None = None) -> NDArray[Any]:
"""
Reads a single frame synchronously from the camera.
@@ -136,9 +138,6 @@ class Reachy2Camera(Camera):
"""
start_time = time.perf_counter()
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.cam_manager is None:
raise DeviceNotConnectedError(f"{self} is not connected.")
@@ -184,6 +183,7 @@ class Reachy2Camera(Camera):
return frame
@check_if_not_connected
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
"""
Same as read()
@@ -197,12 +197,11 @@ class Reachy2Camera(Camera):
TimeoutError: If no frame becomes available within the specified timeout.
RuntimeError: If an unexpected error occurs.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
return self.read()
def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]:
@check_if_not_connected
def read_latest(self, max_age_ms: int = 500) -> NDArray[Any]:
"""Return the most recent frame captured immediately (Peeking).
This method is non-blocking and returns whatever is currently in the
@@ -219,8 +218,6 @@ class Reachy2Camera(Camera):
DeviceNotConnectedError: If the camera is not connected.
RuntimeError: If the camera is connected but has not captured any frames yet.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.latest_frame is None or self.latest_timestamp is None:
raise RuntimeError(f"{self} has not captured any frames yet.")
@@ -233,6 +230,7 @@ class Reachy2Camera(Camera):
return self.latest_frame
@check_if_not_connected
def disconnect(self) -> None:
"""
Stops the background read thread (if running).
@@ -240,8 +238,6 @@ class Reachy2Camera(Camera):
Raises:
DeviceNotConnectedError: If the camera is already disconnected.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} not connected.")
if self.cam_manager is not None:
self.cam_manager.disconnect()
@@ -30,7 +30,8 @@ try:
except Exception as e:
logging.info(f"Could not import realsense: {e}")
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from lerobot.utils.errors import DeviceNotConnectedError
from ..camera import Camera
from ..configs import ColorMode
@@ -152,6 +153,7 @@ class RealSenseCamera(Camera):
"""Checks if the camera pipeline is started and streams are active."""
return self.rs_pipeline is not None and self.rs_profile is not None
@check_if_already_connected
def connect(self, warmup: bool = True) -> None:
"""
Connects to the RealSense camera specified in the configuration.
@@ -169,8 +171,6 @@ class RealSenseCamera(Camera):
ConnectionError: If the camera is found but fails to start the pipeline or no RealSense devices are detected at all.
RuntimeError: If the pipeline starts but fails to apply requested settings.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} is already connected.")
self.rs_pipeline = rs.pipeline()
rs_config = rs.config()
@@ -290,6 +290,7 @@ class RealSenseCamera(Camera):
if self.use_depth:
rs_config.enable_stream(rs.stream.depth)
@check_if_not_connected
def _configure_capture_settings(self) -> None:
"""Sets fps, width, and height from device stream if not already configured.
@@ -299,8 +300,6 @@ class RealSenseCamera(Camera):
Raises:
DeviceNotConnectedError: If device is not connected.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"Cannot validate settings for {self} as it is not connected.")
if self.rs_profile is None:
raise RuntimeError(f"{self}: rs_profile must be initialized before use.")
@@ -320,6 +319,7 @@ class RealSenseCamera(Camera):
self.width, self.height = actual_width, actual_height
self.capture_width, self.capture_height = actual_width, actual_height
@check_if_not_connected
def read_depth(self, timeout_ms: int = 200) -> NDArray[Any]:
"""
Reads a single frame (depth) synchronously from the camera.
@@ -345,9 +345,6 @@ class RealSenseCamera(Camera):
f"Failed to capture depth frame '.read_depth()'. Depth stream is not enabled for {self}."
)
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
@@ -374,6 +371,7 @@ class RealSenseCamera(Camera):
return frame
@check_if_not_connected
def read(self, color_mode: ColorMode | None = None, timeout_ms: int = 0) -> NDArray[Any]:
"""
Reads a single frame (color) synchronously from the camera.
@@ -403,9 +401,6 @@ class RealSenseCamera(Camera):
f"{self} read() timeout_ms parameter is deprecated and will be removed in future versions."
)
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
@@ -534,6 +529,7 @@ class RealSenseCamera(Camera):
self.new_frame_event.clear()
# NOTE(Steven): Missing implementation for depth for now
@check_if_not_connected
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
"""
Reads the latest available frame data (color) asynchronously.
@@ -556,8 +552,6 @@ class RealSenseCamera(Camera):
TimeoutError: If no frame data becomes available within the specified timeout.
RuntimeError: If the background thread died unexpectedly or another error occurs.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
@@ -578,7 +572,8 @@ class RealSenseCamera(Camera):
return frame
# NOTE(Steven): Missing implementation for depth for now
def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]:
@check_if_not_connected
def read_latest(self, max_age_ms: int = 500) -> NDArray[Any]:
"""Return the most recent (color) frame captured immediately (Peeking).
This method is non-blocking and returns whatever is currently in the
@@ -593,8 +588,6 @@ class RealSenseCamera(Camera):
DeviceNotConnectedError: If the camera is not connected.
RuntimeError: If the camera is connected but has not captured any frames yet.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
@@ -60,20 +60,8 @@ class RealSenseCameraConfig(CameraConfig):
warmup_s: int = 1
def __post_init__(self) -> None:
if self.color_mode not in (ColorMode.RGB, ColorMode.BGR):
raise ValueError(
f"`color_mode` is expected to be {ColorMode.RGB.value} or {ColorMode.BGR.value}, but {self.color_mode} is provided."
)
if self.rotation not in (
Cv2Rotation.NO_ROTATION,
Cv2Rotation.ROTATE_90,
Cv2Rotation.ROTATE_180,
Cv2Rotation.ROTATE_270,
):
raise ValueError(
f"`rotation` is expected to be in {(Cv2Rotation.NO_ROTATION, Cv2Rotation.ROTATE_90, Cv2Rotation.ROTATE_180, Cv2Rotation.ROTATE_270)}, but {self.rotation} is provided."
)
self.color_mode = ColorMode(self.color_mode)
self.rotation = Cv2Rotation(self.rotation)
values = (self.fps, self.width, self.height)
if any(v is not None for v in values) and any(v is None for v in values):
-12
View File
@@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import platform
from typing import cast
from lerobot.utils.import_utils import make_device_from_device_class
@@ -68,14 +67,3 @@ def get_cv2_rotation(rotation: Cv2Rotation) -> int | None:
return int(cv2.ROTATE_90_COUNTERCLOCKWISE)
else:
return None
def get_cv2_backend() -> int:
import cv2
if platform.system() == "Windows":
return int(cv2.CAP_MSMF) # Use MSMF for Windows instead of AVFOUNDATION
# elif platform.system() == "Darwin": # macOS
# return cv2.CAP_AVFOUNDATION
else: # Linux and others
return int(cv2.CAP_ANY)
+6 -10
View File
@@ -34,7 +34,8 @@ import cv2
import numpy as np
from numpy.typing import NDArray
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from lerobot.utils.errors import DeviceNotConnectedError
from ..camera import Camera
from ..configs import ColorMode
@@ -104,6 +105,7 @@ class ZMQCamera(Camera):
"""Checks if the ZMQ socket is initialized and connected."""
return self._connected and self.context is not None and self.socket is not None
@check_if_already_connected
def connect(self, warmup: bool = True) -> None:
"""Connect to ZMQ camera server.
@@ -111,8 +113,6 @@ class ZMQCamera(Camera):
warmup (bool): If True, waits for the camera to provide at least one
valid frame before returning. Defaults to True.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} is already connected.")
logger.info(f"Connecting to {self}...")
@@ -211,6 +211,7 @@ class ZMQCamera(Camera):
return frame
@check_if_not_connected
def read(self, color_mode: ColorMode | None = None) -> NDArray[Any]:
"""
Reads a single frame synchronously from the camera.
@@ -228,9 +229,6 @@ class ZMQCamera(Camera):
f"{self} read() color_mode parameter is deprecated and will be removed in future versions."
)
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
@@ -301,6 +299,7 @@ class ZMQCamera(Camera):
self.latest_timestamp = None
self.new_frame_event.clear()
@check_if_not_connected
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
"""
Reads the latest available frame asynchronously.
@@ -317,8 +316,6 @@ class ZMQCamera(Camera):
TimeoutError: If no frame data becomes available within the specified timeout.
RuntimeError: If the background thread is not running.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
@@ -335,6 +332,7 @@ class ZMQCamera(Camera):
return frame
@check_if_not_connected
def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]:
"""Return the most recent frame captured immediately (Peeking).
@@ -350,8 +348,6 @@ class ZMQCamera(Camera):
DeviceNotConnectedError: If the camera is not connected.
RuntimeError: If the camera is connected but has not captured any frames yet.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
+1 -4
View File
@@ -32,10 +32,7 @@ class ZMQCameraConfig(CameraConfig):
warmup_s: int = 1
def __post_init__(self) -> None:
if self.color_mode not in (ColorMode.RGB, ColorMode.BGR):
raise ValueError(
f"`color_mode` is expected to be {ColorMode.RGB.value} or {ColorMode.BGR.value}, but {self.color_mode} is provided."
)
self.color_mode = ColorMode(self.color_mode)
if self.timeout_ms <= 0:
raise ValueError(f"`timeout_ms` must be positive, but {self.timeout_ms} is provided.")
+2
View File
@@ -38,6 +38,8 @@ class EvalPipelineConfig:
seed: int | None = 1000
# Rename map for the observation to override the image and state keys
rename_map: dict[str, str] = field(default_factory=dict)
# Additional kwargs to pass to hub environments (e.g., config_path, config_overrides, custom params)
env_kwargs: dict = field(default_factory=dict)
# Explicit consent to execute remote code from the Hub (required for hub environments).
trust_remote_code: bool = False
@@ -1,50 +0,0 @@
#!/bin/bash
# Example script to run synthetic data generation with Qwen VLM
# This generates user prompts and robot utterances for hierarchical policy training
# Configuration
REPO_ID="lerobot/libero_10"
MODEL="Qwen/Qwen3-VL-30B-A3B-Instruct"
# or: MODEL="Qwen/Qwen2-VL-7B-Instruct"
OUTPUT_DIR="/fsx/jade_choghari/outputs/libero-10-annotate-high"
BATCH_SIZE=16
TEMPERATURE=0.9
SAMPLE_INTERVAL=5.0 # generate dialogue every 1 second (all episodes processed)
# Run subtask annotation
# python /admin/home/jade_choghari/lerobot/src/lerobot/policies/pi05_full/annotate/subtask_annotate.py \
# --repo-id "$REPO_ID" \
# --video-key observation.images.image \
# --output-dir "$OUTPUT_DIR" \
# --skip-existing \
# --output-repo-id "jadechoghari/libero10-annotate" \
# --batch-size "$BATCH_SIZE" \
# run synthetic data generation (all episodes processed)
# python examples/dataset/annotate_pgen.py \
# --repo-id "$REPO_ID" \
# --model "$MODEL" \
# --output-dir "$OUTPUT_DIR" \
# --temperature "$TEMPERATURE" \
# --batch-size "$BATCH_SIZE" \
# --sample-interval "$SAMPLE_INTERVAL" \
# --image-key observation.images.base \
# --num-image-views-per-sample 1
# for faster testing, increase sample interval:
# --sample-interval 5.0 # Samples every 5 seconds (much faster)
# to push to hub after generation:
# add --push-to-hub flag
# efficient batch processing: 4 episodes at once
python src/lerobot/data_processing/annotations/high_level_annotate.py \
--data-dir "/fsx/jade_choghari/outputs/libero-10-annotate" \
--output-dir "$OUTPUT_DIR" \
--video-mode \
--video-key observation.images.image \
--video-batch-size "$BATCH_SIZE" \
--sample-interval 5.0
File diff suppressed because it is too large Load Diff
@@ -1,52 +0,0 @@
import torch
from huggingface_hub import HfApi
import lerobot
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.policies.factory import make_pre_post_processors
from lerobot.configs.policies import PreTrainedConfig
# /fsx/jade_choghari/data/libero_10_subtasks_kw_converted
dataset = LeRobotDataset(repo_id="lerobot/libero_10_image_subtask")
dataloader = torch.utils.data.DataLoader(
dataset,
num_workers=0,
batch_size=2,
shuffle=True,
)
cfg = PreTrainedConfig.from_pretrained(
pretrained_name_or_path="/fsx/jade_choghari/models/pi05-base",
)
cfg.dtype = "bfloat16"
pre_processor, post_processor = make_pre_post_processors(
policy_cfg=cfg,
pretrained_path="/fsx/jade_choghari/models/pi05-base",
)
batch = next(iter(dataloader))
breakpoint()
batch1 = pre_processor(batch)
breakpoint()
print(batch.keys())
# print(batch['task_index_high_level'].shape)
# print(batch['task_index_high_level'])
# print(batch['user_prompt'][0])
# print(batch['robot_utterance'][0])
# print(batch['task'][0])
valid_episode_list = []
for episode_idx in range(len(dataset.meta.episodes)):
subtask_index = dataset[episode_idx]["subtask_index"]
valid_episode_list.append(episode_idx)
print(len(valid_episode_list))
# read this parquet /fsx/jade_choghari/outputs/pgen_annotations1/meta/tasks.parquett
# import pandas as pd
# tasks_df = pd.read_parquet('/fsx/jade_choghari/outputs/pgen_annotations1/meta/tasks.parquet')
# # print all
# print(tasks_df.columns)
# breakpoint()
@@ -1,74 +0,0 @@
#!/bin/bash
# Example script to run synthetic data generation with Qwen VLM
# This generates user prompts and robot utterances for hierarchical policy training
# Configuration
REPO_ID="jadechoghari/piper-demo-20260205_103303"
# MODEL="Qwen/Qwen3-VL-30B-A3B-Thinking"
MODEL="Qwen/Qwen3.5-27B"
# or: MODEL="Qwen/Qwen2-VL-7B-Instruct"
OUTPUT_DIR="/fsx/jade_choghari/outputs/collect-data-pgen_new"
BATCH_SIZE=2
TEMPERATURE=0.9
SAMPLE_INTERVAL=5.0 # generate dialogue every 1 second (all episodes processed)
# Run subtask annotation.
# To use closed-vocabulary labels, add a line: --subtask-labels "label1" "label2" ...
# Example (add backslash after "$MODEL" and uncomment the next line):
# --model "$MODEL" \
# --subtask-labels "pick_up_yellow_nut_bar" "pick_up_cake" "pick_up_biscuit_pack" "pick_up_soda_can"
python /home/lerobot/src/lerobot/data_processing/annotations/subtask_annotate.py \
--repo-id "$REPO_ID" \
--video-key observation.images.top \
--output-dir "$OUTPUT_DIR" \
--output-repo-id "jadechoghari/piper-demo-annotated1" \
--push-to-hub \
--no-timer-overlay \
--model "$MODEL" \
--subtask-labels "pick_up_yellow_nut_bar" "pick_up_cake" "pick_up_biscuit_pack" "pick_up_soda_can" \
--batch-size 2
# Run subtask annotation (image-window: frames as images for better accuracy)
# python /admin/home/jade_choghari/lerobot/src/lerobot/data_processing/annotations/subtask_annotate_image.py \
# --repo-id "$REPO_ID" \
# --camera-key observation.images.wrist \
# --output-dir "$OUTPUT_DIR" \
# --output-repo-id "jadechoghari/piper-demo-annotated1-image" \
# --push-to-hub \
# --model "$MODEL" \
# --window-size 184 \
# --max-frames-per-window 16 \
# --subtask-labels "pick_up_yellow_nut_bar" "pick_up_cake" "pick_up_biscuit_pack" "pick_up_soda_can" \
# --batch-size 2
# run synthetic data generation (all episodes processed)
# python examples/dataset/annotate_pgen.py \
# --repo-id "$REPO_ID" \
# --model "$MODEL" \
# --output-dir "$OUTPUT_DIR" \
# --temperature "$TEMPERATURE" \
# --batch-size "$BATCH_SIZE" \
# --sample-interval "$SAMPLE_INTERVAL" \
# --image-key observation.images.base \
# --num-image-views-per-sample 1
# for faster testing, increase sample interval:
# --sample-interval 5.0 # Samples every 5 seconds (much faster)
# to push to hub after generation:
# add --push-to-hub flag
# efficient batch processing: 4 episodes at once
# python examples/dataset/annotate_pgen.py \
# --repo-id "$REPO_ID" \
# --model "$MODEL" \
# --output-dir "$OUTPUT_DIR" \
# --video-mode \
# --video-key observation.images.up \
# --video-batch-size "$BATCH_SIZE" \
# --sample-interval 1.0
File diff suppressed because it is too large Load Diff
@@ -1,561 +0,0 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Image-window subtask annotation for LeRobot datasets using Qwen VLMs.
This script assigns a subtask to each window of consecutive frames by sending
those frames as images to the VLM (instead of a video) for better accuracy.
Supports Qwen2-VL and Qwen3-VL (same models as subtask_annotate.py).
Pipeline:
1. Load a LeRobot dataset (local or Hub).
2. For each episode, slide a window over frame indices.
3. For each window, load the corresponding images (from image_key or decoded video_key).
4. Send the window of images to Qwen2-VL with the same skill prompt; get one subtask name.
5. Assign that subtask to all frames in the window.
6. Write subtasks.parquet and add subtask_index via add_features (same as subtask_annotate).
Usage:
python -m lerobot.data_processing.annotations.subtask_annotate_image \\
--data-dir /path/to/dataset --camera-key observation.images.base \\
--window-size 8 --stride 8 --output-dir ./output
"""
from __future__ import annotations
import argparse
import random
import textwrap
from pathlib import Path
import numpy as np
import PIL.Image
import torch
from rich.console import Console
from lerobot.datasets.lerobot_dataset import LeRobotDataset
# Reuse data structures and save/load from the video-based annotator
from lerobot.data_processing.annotations.subtask_annotate import (
EpisodeSkills,
Skill,
load_skill_annotations,
save_skill_annotations,
)
def create_window_skill_prompt(
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> str:
"""Prompt for labeling a single window of frames with one atomic skill.
If subtask_labels are provided, the model must choose exactly one from that list.
"""
goal_context = f'The overall goal is: "{coarse_goal}".\n\n' if coarse_goal else ""
if subtask_labels:
labels_list = ", ".join(f'"{l}"' for l in subtask_labels)
label_instruction = (
f"You must choose exactly ONE skill from this list: [{labels_list}]. "
"Do not create new labels. Reply with only that label.\n\n"
)
else:
label_instruction = ""
return textwrap.dedent(f"""\
# Role
You are a Robotics Vision System that labels short clips from robot manipulation demonstrations.
# Task
{goal_context}{label_instruction}The following images are consecutive frames from a single short clip of a robot demonstration.
What single atomic manipulation skill is being performed in this clip?
# Requirements
- Reply with ONLY one short skill name (e.g. "pick up object", "move arm left", "release gripper").
- No explanation, no timestamps, no JSON. Just the skill name.
""").strip()
def _run_image_segmenter(
self,
images: list[PIL.Image.Image],
coarse_goal: str | None,
subtask_labels: list[str] | None = None,
) -> str:
"""Shared inference for Qwen2-VL and Qwen3-VL image window labeling."""
prompt = create_window_skill_prompt(coarse_goal, subtask_labels)
content = []
for img in images:
content.append({"type": "image", "image": img})
content.append({"type": "text", "text": "What single atomic skill is shown in these frames? Reply with only the skill name."})
messages = [
{"role": "system", "content": [{"type": "text", "text": prompt}]},
{"role": "user", "content": content},
]
text = self.processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = self.process_vision_info(messages)
inputs = self.processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
).to(self.device)
with torch.no_grad():
generated_ids = self.model.generate(**inputs, max_new_tokens=128, do_sample=False)
response = self.processor.batch_decode(
[out[len(inp) :] for inp, out in zip(inputs.input_ids, generated_ids)],
skip_special_tokens=True,
)[0].strip()
skill_name = response.split("\n")[0].strip().strip('."')
return skill_name if skill_name else "unknown"
def _run_image_segmenter_batch(
self,
batch_images: list[list[PIL.Image.Image]],
coarse_goal: str | None,
subtask_labels: list[str] | None = None,
) -> list[str]:
"""Run VLM on multiple windows at once; returns one skill name per window."""
if not batch_images:
return []
prompt = create_window_skill_prompt(coarse_goal, subtask_labels)
all_texts = []
all_image_inputs = []
all_video_inputs = []
for images in batch_images:
content = []
for img in images:
content.append({"type": "image", "image": img})
content.append({"type": "text", "text": "What single atomic skill is shown in these frames? Reply with only the skill name."})
messages = [
{"role": "system", "content": [{"type": "text", "text": prompt}]},
{"role": "user", "content": content},
]
text = self.processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = self.process_vision_info(messages)
all_texts.append(text)
if image_inputs is not None:
all_image_inputs.extend(image_inputs if isinstance(image_inputs, list) else [image_inputs])
if video_inputs is not None:
all_video_inputs.extend(video_inputs if isinstance(video_inputs, list) else [video_inputs])
inputs = self.processor(
text=all_texts,
images=all_image_inputs if all_image_inputs else None,
videos=all_video_inputs if all_video_inputs else None,
padding=True,
return_tensors="pt",
).to(self.device)
with torch.no_grad():
generated_ids = self.model.generate(**inputs, max_new_tokens=128, do_sample=False)
responses = self.processor.batch_decode(
[out[len(inp) :] for inp, out in zip(inputs.input_ids, generated_ids)],
skip_special_tokens=True,
)
return [
(r.split("\n")[0].strip().strip('."') or "unknown")
for r in responses
]
class Qwen2VLImageSegmenter:
"""Uses Qwen2-VL to assign one skill name to a window of images (same model as subtask_annotate)."""
def __init__(self, model_name: str, device: str = "cuda", torch_dtype: torch.dtype = torch.bfloat16):
from qwen_vl_utils import process_vision_info
from transformers import AutoProcessor, Qwen2VLForConditionalGeneration
self.console = Console()
self.device = device
self.process_vision_info = process_vision_info
self.console.print(f"[cyan]Loading Qwen2-VL for image-window labeling: {model_name}...[/cyan]")
self.model = Qwen2VLForConditionalGeneration.from_pretrained(
model_name, torch_dtype=torch_dtype, device_map=device, trust_remote_code=True
)
self.processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True)
self.console.print(f"[green]✓ Model loaded on {device}[/green]")
def segment_skill_from_images(
self,
images: list[PIL.Image.Image],
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> str:
"""Return a single skill name for the given window of images."""
return _run_image_segmenter(self, images, coarse_goal, subtask_labels)
def segment_skill_from_images_batch(
self,
batch_images: list[list[PIL.Image.Image]],
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> list[str]:
"""Return one skill name per window; processes multiple windows in one forward pass."""
return _run_image_segmenter_batch(self, batch_images, coarse_goal, subtask_labels)
class Qwen3VLImageSegmenter:
"""Uses Qwen3-VL (MoE) to assign one skill name to a window of images."""
def __init__(self, model_name: str, device: str = "cuda", torch_dtype: torch.dtype = torch.bfloat16):
from qwen_vl_utils import process_vision_info
from transformers import AutoProcessor, Qwen3VLMoeForConditionalGeneration
self.console = Console()
self.device = device
self.process_vision_info = process_vision_info
self.console.print(f"[cyan]Loading Qwen3-VL for image-window labeling: {model_name}...[/cyan]")
self.model = Qwen3VLMoeForConditionalGeneration.from_pretrained(
model_name, torch_dtype=torch_dtype, device_map=device, trust_remote_code=True
)
self.processor = AutoProcessor.from_pretrained(model_name, trust_remote_code=True)
self.console.print(f"[green]✓ Model loaded on {device}[/green]")
def segment_skill_from_images(
self,
images: list[PIL.Image.Image],
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> str:
"""Return a single skill name for the given window of images."""
return _run_image_segmenter(self, images, coarse_goal, subtask_labels)
def segment_skill_from_images_batch(
self,
batch_images: list[list[PIL.Image.Image]],
coarse_goal: str | None = None,
subtask_labels: list[str] | None = None,
) -> list[str]:
"""Return one skill name per window; processes multiple windows in one forward pass."""
return _run_image_segmenter_batch(self, batch_images, coarse_goal, subtask_labels)
def get_image_segmenter(
model_name: str,
device: str = "cuda",
torch_dtype: torch.dtype = torch.bfloat16,
):
"""Return the appropriate image-window segmenter for the model (Qwen2-VL or Qwen3-VL)."""
model_lower = model_name.lower()
if "qwen3" in model_lower:
return Qwen3VLImageSegmenter(model_name, device, torch_dtype)
return Qwen2VLImageSegmenter(model_name, device, torch_dtype)
def frame_to_pil(frame_value) -> PIL.Image.Image:
"""Convert a single frame from dataset (tensor or PIL or path) to PIL.Image."""
if isinstance(frame_value, PIL.Image.Image):
return frame_value
if isinstance(frame_value, (str, Path)):
return PIL.Image.open(frame_value).convert("RGB")
if hasattr(frame_value, "numpy"):
arr = frame_value.numpy()
else:
arr = np.asarray(frame_value)
if arr.ndim == 3 and arr.shape[0] in (1, 3, 4):
arr = np.transpose(arr, (1, 2, 0))
if arr.dtype == np.float32 or arr.dtype == np.float64:
arr = (np.clip(arr, 0, 1) * 255).astype(np.uint8)
elif arr.dtype != np.uint8:
arr = np.clip(arr, 0, 255).astype(np.uint8)
if arr.shape[-1] == 1:
arr = np.repeat(arr, 3, axis=-1)
return PIL.Image.fromarray(arr)
def _sample_window_indices(window_length: int, max_frames: int) -> list[int]:
"""Return indices into a window of length window_length, at most max_frames, in order.
If window_length <= max_frames, returns range(window_length).
Otherwise returns sorted random sample of max_frames indices (temporal order preserved).
"""
if max_frames <= 0 or window_length <= max_frames:
return list(range(window_length))
return sorted(random.sample(range(window_length), max_frames))
class SkillAnnotatorImage:
"""Annotates episodes by sliding a window over frames and labeling each window with the VLM."""
def __init__(
self,
segmenter: Qwen2VLImageSegmenter | Qwen3VLImageSegmenter,
window_size: int = 8,
stride: int | None = None,
batch_size: int = 1,
max_frames_per_window: int | None = None,
console: Console | None = None,
):
self.segmenter = segmenter
self.window_size = window_size
self.stride = stride if stride is not None else window_size
self.batch_size = max(1, batch_size)
self.max_frames_per_window = max_frames_per_window
self.console = console or Console()
def annotate_dataset(
self,
dataset: LeRobotDataset,
camera_key: str,
episodes: list[int] | None = None,
skip_existing: bool = False,
subtask_labels: list[str] | None = None,
) -> dict[int, EpisodeSkills]:
"""Annotate episodes using image windows. camera_key can be an image_key or video_key."""
episode_indices = episodes or list(range(dataset.meta.total_episodes))
coarse_goal = self._get_coarse_goal(dataset)
annotations: dict[int, EpisodeSkills] = {}
if skip_existing:
existing = load_skill_annotations(dataset.root)
if existing and existing.get("episodes"):
existing_eps = {int(k) for k in existing["episodes"] if existing["episodes"][k].get("skills")}
episode_indices = [i for i in episode_indices if i not in existing_eps]
for ep_idx in episode_indices:
try:
skills = self._annotate_episode(
dataset, ep_idx, camera_key, coarse_goal, subtask_labels
)
if skills:
annotations[ep_idx] = EpisodeSkills(
episode_index=ep_idx,
description=coarse_goal,
skills=skills,
)
self.console.print(f"[green]✓ Episode {ep_idx}: {len(skills)} window skills[/green]")
else:
self.console.print(f"[yellow]⚠ Episode {ep_idx}: no skills[/yellow]")
except Exception as e:
self.console.print(f"[red]Episode {ep_idx} failed: {e}[/red]")
return annotations
def _get_coarse_goal(self, dataset: LeRobotDataset) -> str:
if dataset.meta.tasks is not None and len(dataset.meta.tasks) > 0:
return str(dataset.meta.tasks.index[0])
return "Perform the demonstrated manipulation task."
def _annotate_episode(
self,
dataset: LeRobotDataset,
episode_index: int,
camera_key: str,
coarse_goal: str,
subtask_labels: list[str] | None = None,
) -> list[Skill]:
ep = dataset.meta.episodes[episode_index]
ep_from = int(ep["dataset_from_index"])
ep_to = int(ep["dataset_to_index"])
length = ep_to - ep_from
fps = dataset.meta.fps
if length == 0:
return []
# Collect full windows: (images, t_start, t_end) using frame timestamps.
# If max_frames_per_window is set and window is larger, sample that many frames (order preserved).
window_specs: list[tuple[list[PIL.Image.Image], float, float]] = []
start = 0
while start + self.window_size <= length:
offsets = _sample_window_indices(
self.window_size,
self.max_frames_per_window or self.window_size,
)
frame_indices = [ep_from + start + i for i in offsets]
images = []
t_start = float(dataset[frame_indices[0]]["timestamp"].item())
for idx in frame_indices:
item = dataset[idx]
images.append(frame_to_pil(item[camera_key]))
t_end = t_start + self.window_size / fps
window_specs.append((images, t_start, t_end))
start += self.stride
# Last partial window
if start < length:
partial_len = ep_to - (ep_from + start)
offsets = _sample_window_indices(
partial_len,
self.max_frames_per_window or partial_len,
)
frame_indices = [ep_from + start + i for i in offsets]
images = []
t_start = float(dataset[frame_indices[0]]["timestamp"].item())
for idx in frame_indices:
item = dataset[idx]
images.append(frame_to_pil(item[camera_key]))
t_end = float(dataset[frame_indices[-1]]["timestamp"].item()) + 1.0 / fps
window_specs.append((images, t_start, t_end))
# Run in batches
skills: list[Skill] = []
for i in range(0, len(window_specs), self.batch_size):
chunk = window_specs[i : i + self.batch_size]
batch_images = [spec[0] for spec in chunk]
if len(batch_images) > 1:
skill_names = self.segmenter.segment_skill_from_images_batch(
batch_images, coarse_goal, subtask_labels
)
else:
skill_names = [
self.segmenter.segment_skill_from_images(
batch_images[0], coarse_goal, subtask_labels
)
]
for (_, t_start, t_end), name in zip(chunk, skill_names, strict=True):
skills.append(Skill(name=name, start=t_start, end=t_end))
return skills
def main():
parser = argparse.ArgumentParser(
description="Image-window subtask annotation using Qwen VLM (frames as images for better accuracy)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent("""\
Examples:
python -m lerobot.data_processing.annotations.subtask_annotate_image \\
--data-dir /path/to/dataset --camera-key observation.images.base \\
--window-size 8 --output-dir ./output
python -m lerobot.data_processing.annotations.subtask_annotate_image \\
--repo-id user/dataset --camera-key observation.images.base \\
--window-size 6 --stride 3 --model Qwen/Qwen2-VL-7B-Instruct
# Use Qwen3-VL (MoE)
python -m lerobot.data_processing.annotations.subtask_annotate_image \\
--data-dir /path/to/dataset --camera-key observation.images.base \\
--model Qwen/Qwen3-VL-30B-A3B-Instruct
"""),
)
data_group = parser.add_mutually_exclusive_group(required=True)
data_group.add_argument("--data-dir", type=str, help="Path to local LeRobot dataset")
data_group.add_argument("--repo-id", type=str, help="HuggingFace Hub dataset repository ID")
parser.add_argument(
"--camera-key",
type=str,
required=True,
help="Image or video observation key (e.g. observation.images.base)",
)
parser.add_argument(
"--model",
type=str,
default="Qwen/Qwen2-VL-7B-Instruct",
help="VLM model: Qwen2-VL or Qwen3-VL (default: Qwen/Qwen2-VL-7B-Instruct)",
)
parser.add_argument(
"--device",
type=str,
default="cuda",
)
parser.add_argument(
"--window-size",
type=int,
default=8,
help="Number of frames per window (default: 8)",
)
parser.add_argument(
"--stride",
type=int,
default=None,
help="Stride for sliding window (default: window_size = non-overlapping)",
)
parser.add_argument(
"--batch-size",
type=int,
default=1,
help="Number of windows to process in one VLM call (default: 1; increase for speed)",
)
parser.add_argument(
"--max-frames-per-window",
type=int,
default=None,
metavar="N",
help="If window has more than N frames, randomly sample N frames (order kept) to avoid OOM (e.g. 16)",
)
parser.add_argument("--episodes", type=int, nargs="+", help="Episode indices to annotate (default: all)")
parser.add_argument("--skip-existing", action="store_true", help="Skip episodes that already have annotations")
parser.add_argument(
"--subtask-labels",
type=str,
nargs="*",
default=None,
help="Closed vocabulary: model must choose only from these labels",
)
parser.add_argument("--output-dir", type=str, help="Output directory for dataset with subtask_index")
parser.add_argument("--output-repo-id", type=str, help="Output repo id (default: <repo_id>_with_subtasks)")
parser.add_argument("--push-to-hub", action="store_true")
args = parser.parse_args()
console = Console()
# Load dataset
console.print("[cyan]Loading dataset...[/cyan]")
if args.data_dir:
dataset = LeRobotDataset(repo_id="local/dataset", root=args.data_dir, download_videos=False)
else:
dataset = LeRobotDataset(repo_id=args.repo_id, download_videos=True)
camera_keys = dataset.meta.camera_keys
if args.camera_key not in camera_keys:
console.print(f"[red]Error: camera key '{args.camera_key}' not in {camera_keys}[/red]")
return
console.print(f"[green]✓ Loaded dataset, {dataset.meta.total_episodes} episodes[/green]")
# Same Qwen VLM as subtask_annotate (Qwen2-VL or Qwen3-VL), image windows instead of video
segmenter = get_image_segmenter(args.model, args.device, torch.bfloat16)
annotator = SkillAnnotatorImage(
segmenter=segmenter,
window_size=args.window_size,
stride=args.stride,
batch_size=args.batch_size,
max_frames_per_window=args.max_frames_per_window,
console=console,
)
annotations = annotator.annotate_dataset(
dataset=dataset,
camera_key=args.camera_key,
episodes=args.episodes,
skip_existing=args.skip_existing,
subtask_labels=args.subtask_labels,
)
if not annotations:
console.print("[yellow]No annotations to save.[/yellow]")
return
output_dir = Path(args.output_dir) if args.output_dir else None
output_repo_id = args.output_repo_id
new_dataset = save_skill_annotations(dataset, annotations, output_dir, output_repo_id)
total_skills = sum(len(a.skills) for a in annotations.values())
console.print(f"[bold green]✓ Done.[/bold green] Episodes: {len(annotations)}, total window skills: {total_skills}")
console.print(f" Dataset with subtask_index: {new_dataset.root}")
if args.push_to_hub and not args.data_dir:
console.print("[cyan]Pushing to Hub...[/cyan]")
try:
new_dataset.push_to_hub(push_videos=False)
console.print("[green]✓ Pushed.[/green]")
except Exception as e:
console.print(f"[red]Push failed: {e}[/red]")
if __name__ == "__main__":
main()
+113 -38
View File
@@ -59,7 +59,6 @@ from lerobot.datasets.utils import (
load_stats,
load_subtasks,
load_tasks,
load_tasks_high_level,
update_chunk_file_indices,
validate_episode_buffer,
validate_frame,
@@ -69,6 +68,7 @@ from lerobot.datasets.utils import (
write_tasks,
)
from lerobot.datasets.video_utils import (
StreamingVideoEncoder,
VideoFrame,
concatenate_video_files,
decode_video_frames,
@@ -76,11 +76,11 @@ from lerobot.datasets.video_utils import (
get_safe_default_codec,
get_video_duration_in_s,
get_video_info,
resolve_vcodec,
)
from lerobot.utils.constants import HF_LEROBOT_HOME
CODEBASE_VERSION = "v3.0"
VALID_VIDEO_CODECS = {"h264", "hevc", "libsvtav1"}
class LeRobotDatasetMetadata:
@@ -164,7 +164,6 @@ class LeRobotDatasetMetadata:
self.info = load_info(self.root)
check_version_compatibility(self.repo_id, self._version, CODEBASE_VERSION)
self.tasks = load_tasks(self.root)
self.tasks_high_level = load_tasks_high_level(self.root)
self.subtasks = load_subtasks(self.root)
self.episodes = load_episodes(self.root)
self.stats = load_stats(self.root)
@@ -522,7 +521,6 @@ class LeRobotDatasetMetadata:
_validate_feature_names(features)
obj.tasks = None
obj.tasks_high_level = None
obj.subtasks = None
obj.episodes = None
obj.stats = None
@@ -548,12 +546,19 @@ class LeRobotDatasetMetadata:
def _encode_video_worker(
video_key: str, episode_index: int, root: Path, fps: int, vcodec: str = "libsvtav1"
video_key: str,
episode_index: int,
root: Path,
fps: int,
vcodec: str = "libsvtav1",
encoder_threads: int | None = None,
) -> Path:
temp_path = Path(tempfile.mkdtemp(dir=root)) / f"{video_key}_{episode_index:03d}.mp4"
fpath = DEFAULT_IMAGE_PATH.format(image_key=video_key, episode_index=episode_index, frame_index=0)
img_dir = (root / fpath).parent
encode_video_frames(img_dir, temp_path, fps, vcodec=vcodec, overwrite=True)
encode_video_frames(
img_dir, temp_path, fps, vcodec=vcodec, overwrite=True, encoder_threads=encoder_threads
)
shutil.rmtree(img_dir)
return temp_path
@@ -573,6 +578,9 @@ class LeRobotDataset(torch.utils.data.Dataset):
video_backend: str | None = None,
batch_encoding_size: int = 1,
vcodec: str = "libsvtav1",
streaming_encoding: bool = False,
encoder_queue_maxsize: int = 30,
encoder_threads: int | None = None,
):
"""
2 modes are available for instantiating this class, depending on 2 different use cases:
@@ -659,7 +667,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
repo_id (str): This is the repo id that will be used to fetch the dataset. Locally, the dataset
will be stored under root/repo_id.
root (Path | None, optional): Local directory to use for downloading/writing files. You can also
set the LEROBOT_HOME environment variable to point to a different location. Defaults to
set the HF_LEROBOT_HOME environment variable to point to a different location. Defaults to
'~/.cache/huggingface/lerobot'.
episodes (list[int] | None, optional): If specified, this will only load episodes specified by
their episode_index in this list. Defaults to None.
@@ -686,12 +694,17 @@ class LeRobotDataset(torch.utils.data.Dataset):
batch_encoding_size (int, optional): Number of episodes to accumulate before batch encoding videos.
Set to 1 for immediate encoding (default), or higher for batched encoding. Defaults to 1.
vcodec (str, optional): Video codec for encoding videos during recording. Options: 'h264', 'hevc',
'libsvtav1'. Defaults to 'libsvtav1'. Use 'h264' for faster encoding on systems where AV1
encoding is CPU-heavy.
'libsvtav1', 'auto', or hardware-specific codecs like 'h264_videotoolbox', 'h264_nvenc'.
Defaults to 'libsvtav1'. Use 'auto' to auto-detect the best available hardware encoder.
streaming_encoding (bool, optional): If True, encode video frames in real-time during capture
instead of writing PNG images first. This makes save_episode() near-instant. Defaults to False.
encoder_queue_maxsize (int, optional): Maximum number of frames to buffer per camera when using
streaming encoding. Defaults to 30 (~1s at 30fps).
encoder_threads (int | None, optional): Number of threads per encoder instance. None lets the
codec auto-detect (default). Lower values reduce CPU usage per encoder. Maps to 'lp' (via svtav1-params) for
libsvtav1 and 'threads' for h264/hevc.
"""
super().__init__()
if vcodec not in VALID_VIDEO_CODECS:
raise ValueError(f"Invalid vcodec '{vcodec}'. Must be one of: {sorted(VALID_VIDEO_CODECS)}")
self.repo_id = repo_id
self.root = Path(root) if root else HF_LEROBOT_HOME / repo_id
self.image_transforms = image_transforms
@@ -703,7 +716,8 @@ class LeRobotDataset(torch.utils.data.Dataset):
self.delta_indices = None
self.batch_encoding_size = batch_encoding_size
self.episodes_since_last_encoding = 0
self.vcodec = vcodec
self.vcodec = resolve_vcodec(vcodec)
self._encoder_threads = encoder_threads
# Unused attributes
self.image_writer = None
@@ -711,6 +725,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
self.writer = None
self.latest_episode = None
self._current_file_start_frame = None # Track the starting frame index of the current parquet file
self._streaming_encoder = None
self.root.mkdir(exist_ok=True, parents=True)
@@ -752,6 +767,19 @@ class LeRobotDataset(torch.utils.data.Dataset):
check_delta_timestamps(self.delta_timestamps, self.fps, self.tolerance_s)
self.delta_indices = get_delta_indices(self.delta_timestamps, self.fps)
# Initialize streaming encoder for resumed recording
if streaming_encoding and len(self.meta.video_keys) > 0:
self._streaming_encoder = StreamingVideoEncoder(
fps=self.meta.fps,
vcodec=self.vcodec,
pix_fmt="yuv420p",
g=2,
crf=30,
preset=None,
queue_maxsize=encoder_queue_maxsize,
encoder_threads=encoder_threads,
)
def _close_writer(self) -> None:
"""Close and cleanup the parquet writer if it exists."""
writer = getattr(self, "writer", None)
@@ -1070,17 +1098,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
if len(self.meta.video_keys) > 0:
current_ts = item["timestamp"].item()
query_timestamps = self._get_query_timestamps(current_ts, query_indices)
try:
video_frames = self._query_videos(query_timestamps, ep_idx)
except Exception as e:
print("\n" + "=" * 120)
print("[VIDEO DECODE FAILURE]")
print(f"item={item}")
print(f"query_indices={query_indices}")
print(f"query_timestamps={query_timestamps}")
print(f"ep_idx={ep_idx}")
print("=" * 120 + "\n")
raise
video_frames = self._query_videos(query_timestamps, ep_idx)
item = {**video_frames, **item}
if self.image_transforms is not None:
@@ -1091,14 +1109,6 @@ class LeRobotDataset(torch.utils.data.Dataset):
# Add task as a string
task_idx = item["task_index"].item()
item["task"] = self.meta.tasks.iloc[task_idx].name
# optionally add high level task index
if "task_index_high_level" in self.features:
high_level_task_idx = item["task_index_high_level"].item()
item["robot_utterance"] = self.meta.tasks_high_level.iloc[high_level_task_idx]["robot_utterance"]
item["user_prompt"] = self.meta.tasks_high_level.iloc[high_level_task_idx]["user_prompt"]
# add subtask information if available
if "subtask_index" in self.features and self.meta.subtasks is not None:
@@ -1125,6 +1135,8 @@ class LeRobotDataset(torch.utils.data.Dataset):
"""
self._close_writer()
self.meta._close_writer()
if self._streaming_encoder is not None:
self._streaming_encoder.close()
def create_episode_buffer(self, episode_index: int | None = None) -> dict:
current_ep_idx = self.meta.total_episodes if episode_index is None else episode_index
@@ -1179,6 +1191,13 @@ class LeRobotDataset(torch.utils.data.Dataset):
self.episode_buffer["timestamp"].append(timestamp)
self.episode_buffer["task"].append(frame.pop("task")) # Remove task from frame after processing
# Start streaming encoder on first frame of episode (once, before iterating keys)
if frame_index == 0 and self._streaming_encoder is not None:
self._streaming_encoder.start_episode(
video_keys=list(self.meta.video_keys),
temp_dir=self.root,
)
# Add frame features to episode_buffer
for key in frame:
if key not in self.features:
@@ -1186,7 +1205,10 @@ class LeRobotDataset(torch.utils.data.Dataset):
f"An element of the frame is not in the features. '{key}' not in '{self.features.keys()}'."
)
if self.features[key]["dtype"] in ["image", "video"]:
if self.features[key]["dtype"] == "video" and self._streaming_encoder is not None:
self._streaming_encoder.feed_frame(key, frame[key])
self.episode_buffer[key].append(None) # Placeholder (video keys are skipped in parquet)
elif self.features[key]["dtype"] in ["image", "video"]:
img_path = self._get_image_file_path(
episode_index=self.episode_buffer["episode_index"], image_key=key, frame_index=frame_index
)
@@ -1247,13 +1269,38 @@ class LeRobotDataset(torch.utils.data.Dataset):
# Wait for image writer to end, so that episode stats over images can be computed
self._wait_image_writer()
ep_stats = compute_episode_stats(episode_buffer, self.features)
ep_metadata = self._save_episode_data(episode_buffer)
has_video_keys = len(self.meta.video_keys) > 0
use_streaming = self._streaming_encoder is not None and has_video_keys
use_batched_encoding = self.batch_encoding_size > 1
if has_video_keys and not use_batched_encoding:
if use_streaming:
# Compute stats for non-video features only (video stats come from encoder)
non_video_buffer = {
k: v
for k, v in episode_buffer.items()
if self.features.get(k, {}).get("dtype") not in ("video",)
}
non_video_features = {k: v for k, v in self.features.items() if v["dtype"] != "video"}
ep_stats = compute_episode_stats(non_video_buffer, non_video_features)
else:
ep_stats = compute_episode_stats(episode_buffer, self.features)
ep_metadata = self._save_episode_data(episode_buffer)
if use_streaming:
# Finish streaming encoding and collect results
streaming_results = self._streaming_encoder.finish_episode()
for video_key in self.meta.video_keys:
temp_path, video_stats = streaming_results[video_key]
if video_stats is not None:
# Format stats same as compute_episode_stats: normalize to [0,1], reshape to (C,1,1)
ep_stats[video_key] = {
k: v if k == "count" else np.squeeze(v.reshape(1, -1, 1, 1) / 255.0, axis=0)
for k, v in video_stats.items()
}
ep_metadata.update(self._save_episode_video(video_key, episode_index, temp_path=temp_path))
elif has_video_keys and not use_batched_encoding:
num_cameras = len(self.meta.video_keys)
if parallel_encoding and num_cameras > 1:
# TODO(Steven): Ideally we would like to control the number of threads per encoding such that:
@@ -1267,6 +1314,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
self.root,
self.fps,
self.vcodec,
self._encoder_threads,
): video_key
for video_key in self.meta.video_keys
}
@@ -1535,6 +1583,10 @@ class LeRobotDataset(torch.utils.data.Dataset):
return metadata
def clear_episode_buffer(self, delete_images: bool = True) -> None:
# Cancel streaming encoder if active
if self._streaming_encoder is not None:
self._streaming_encoder.cancel_episode()
# Clean up image files for the current episode buffer
if delete_images:
# Wait for the async image writer to finish
@@ -1582,7 +1634,9 @@ class LeRobotDataset(torch.utils.data.Dataset):
Note: `encode_video_frames` is a blocking call. Making it asynchronous shouldn't speedup encoding,
since video encoding with ffmpeg is already using multithreading.
"""
return _encode_video_worker(video_key, episode_index, self.root, self.fps, self.vcodec)
return _encode_video_worker(
video_key, episode_index, self.root, self.fps, self.vcodec, self._encoder_threads
)
@classmethod
def create(
@@ -1599,10 +1653,13 @@ class LeRobotDataset(torch.utils.data.Dataset):
video_backend: str | None = None,
batch_encoding_size: int = 1,
vcodec: str = "libsvtav1",
metadata_buffer_size: int = 10,
streaming_encoding: bool = False,
encoder_queue_maxsize: int = 30,
encoder_threads: int | None = None,
) -> "LeRobotDataset":
"""Create a LeRobot Dataset from scratch in order to record data."""
if vcodec not in VALID_VIDEO_CODECS:
raise ValueError(f"Invalid vcodec '{vcodec}'. Must be one of: {sorted(VALID_VIDEO_CODECS)}")
vcodec = resolve_vcodec(vcodec)
obj = cls.__new__(cls)
obj.meta = LeRobotDatasetMetadata.create(
repo_id=repo_id,
@@ -1611,6 +1668,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
features=features,
root=root,
use_videos=use_videos,
metadata_buffer_size=metadata_buffer_size,
)
obj.repo_id = obj.meta.repo_id
obj.root = obj.meta.root
@@ -1620,6 +1678,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
obj.batch_encoding_size = batch_encoding_size
obj.episodes_since_last_encoding = 0
obj.vcodec = vcodec
obj._encoder_threads = encoder_threads
if image_writer_processes or image_writer_threads:
obj.start_image_writer(image_writer_processes, image_writer_threads)
@@ -1641,6 +1700,22 @@ class LeRobotDataset(torch.utils.data.Dataset):
obj._lazy_loading = False
obj._recorded_frames = 0
obj._writer_closed_for_reading = False
# Initialize streaming encoder
if streaming_encoding and len(obj.meta.video_keys) > 0:
obj._streaming_encoder = StreamingVideoEncoder(
fps=fps,
vcodec=vcodec,
pix_fmt="yuv420p",
g=2,
crf=30,
preset=None,
queue_maxsize=encoder_queue_maxsize,
encoder_threads=encoder_threads,
)
else:
obj._streaming_encoder = None
return obj
+3 -29
View File
@@ -62,8 +62,6 @@ CHUNK_FILE_PATTERN = "chunk-{chunk_index:03d}/file-{file_index:03d}"
DEFAULT_TASKS_PATH = "meta/tasks.parquet"
DEFAULT_SUBTASKS_PATH = "meta/subtasks.parquet"
DEFAULT_EPISODES_PATH = EPISODES_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
DEFAULT_TASKS_HIGH_LEVEL_PATH = "meta/tasks_high_level.parquet"
DEFAULT_SUBTASKS_PATH = "meta/subtasks.parquet"
DEFAULT_DATA_PATH = DATA_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
DEFAULT_VIDEO_PATH = VIDEO_DIR + "/{video_key}/" + CHUNK_FILE_PATTERN + ".mp4"
DEFAULT_IMAGE_PATH = "images/{image_key}/episode-{episode_index:06d}/frame-{frame_index:06d}.png"
@@ -124,19 +122,9 @@ def load_nested_dataset(
raise FileNotFoundError(f"Provided directory does not contain any parquet file: {pq_dir}")
with SuppressProgressBars():
# When no filtering needed, Dataset uses memory-mapped loading for efficiency
# PyArrow loads the entire dataset into memory
if episodes is None:
return Dataset.from_parquet([str(path) for path in paths], features=features)
arrow_dataset = pa_ds.dataset(paths, format="parquet")
filter_expr = pa_ds.field("episode_index").isin(episodes)
table = arrow_dataset.to_table(filter=filter_expr)
if features is not None:
table = table.cast(features.arrow_schema)
return Dataset(table)
# We use .from_parquet() memory-mapped loading for efficiency
filters = pa_ds.field("episode_index").isin(episodes) if episodes is not None else None
return Dataset.from_parquet([str(path) for path in paths], filters=filters, features=features)
def get_parquet_num_frames(parquet_path: str | Path) -> int:
@@ -355,20 +343,6 @@ def load_tasks(local_dir: Path) -> pandas.DataFrame:
tasks = pd.read_parquet(local_dir / DEFAULT_TASKS_PATH)
return tasks
def load_tasks_high_level(local_dir: Path) -> pandas.DataFrame | None:
"""Load high-level tasks from tasks_high_level.parquet if it exists."""
tasks_high_level_path = local_dir / DEFAULT_TASKS_HIGH_LEVEL_PATH
if tasks_high_level_path.exists():
return pd.read_parquet(tasks_high_level_path)
return None
def load_subtasks(local_dir: Path) -> pandas.DataFrame | None:
"""Load subtasks from subtasks.parquet if it exists."""
subtasks_path = local_dir / DEFAULT_SUBTASKS_PATH
if subtasks_path.exists():
return pd.read_parquet(subtasks_path)
return None
def load_subtasks(local_dir: Path) -> pandas.DataFrame | None:
"""Load subtasks from subtasks.parquet if it exists."""
@@ -529,7 +529,7 @@ if __name__ == "__main__":
type=str,
required=True,
help="Repository identifier on Hugging Face: a community or a user name `/` the name of the dataset "
"(e.g. `lerobot/pusht`, `cadene/aloha_sim_insertion_human`).",
"(e.g. `lerobot/pusht`, `<USER>/aloha_sim_insertion_human`).",
)
parser.add_argument(
"--branch",
+454 -26
View File
@@ -13,25 +13,106 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import glob
import importlib
import logging
import queue
import shutil
import tempfile
import threading
import warnings
from dataclasses import dataclass, field
from fractions import Fraction
from pathlib import Path
from threading import Lock
from typing import Any, ClassVar
import av
import fsspec
import numpy as np
import pyarrow as pa
import torch
import torchvision
from datasets.features.features import register_feature
from PIL import Image
# List of hardware encoders to probe for auto-selection. Availability depends on the platform and FFmpeg build.
# Determines the order of preference for auto-selection when vcodec="auto" is used.
HW_ENCODERS = [
"h264_videotoolbox", # macOS
"hevc_videotoolbox", # macOS
"h264_nvenc", # NVIDIA GPU
"hevc_nvenc", # NVIDIA GPU
"h264_vaapi", # Linux Intel/AMD
"h264_qsv", # Intel Quick Sync
]
VALID_VIDEO_CODECS = {"h264", "hevc", "libsvtav1", "auto"} | set(HW_ENCODERS)
def _get_codec_options(
vcodec: str,
g: int | None = 2,
crf: int | None = 30,
preset: int | None = None,
) -> dict:
"""Build codec-specific options dict for video encoding."""
options = {}
# GOP size (keyframe interval) - supported by VideoToolbox and software encoders
if g is not None and (vcodec in ("h264_videotoolbox", "hevc_videotoolbox") or vcodec not in HW_ENCODERS):
options["g"] = str(g)
# Quality control (codec-specific parameter names)
if crf is not None:
if vcodec in ("h264", "hevc", "libsvtav1"):
options["crf"] = str(crf)
elif vcodec in ("h264_videotoolbox", "hevc_videotoolbox"):
quality = max(1, min(100, int(100 - crf * 2)))
options["q:v"] = str(quality)
elif vcodec in ("h264_nvenc", "hevc_nvenc"):
options["rc"] = "constqp"
options["qp"] = str(crf)
elif vcodec in ("h264_vaapi",):
options["qp"] = str(crf)
elif vcodec in ("h264_qsv",):
options["global_quality"] = str(crf)
# Preset (only for libsvtav1)
if vcodec == "libsvtav1":
options["preset"] = str(preset) if preset is not None else "12"
return options
def detect_available_hw_encoders() -> list[str]:
"""Probe PyAV/FFmpeg for available hardware video encoders."""
available = []
for codec_name in HW_ENCODERS:
try:
av.codec.Codec(codec_name, "w")
available.append(codec_name)
except Exception: # nosec B110
pass # nosec B110
return available
def resolve_vcodec(vcodec: str) -> str:
"""Validate vcodec and resolve 'auto' to best available HW encoder, fallback to libsvtav1."""
if vcodec not in VALID_VIDEO_CODECS:
raise ValueError(f"Invalid vcodec '{vcodec}'. Must be one of: {sorted(VALID_VIDEO_CODECS)}")
if vcodec != "auto":
logging.info(f"Using video codec: {vcodec}")
return vcodec
available = detect_available_hw_encoders()
for encoder in HW_ENCODERS:
if encoder in available:
logging.info(f"Auto-selected video codec: {encoder}")
return encoder
logging.info("No hardware encoder available, falling back to software encoder 'libsvtav1'")
return "libsvtav1"
def get_safe_default_codec():
if importlib.util.find_spec("torchcodec"):
@@ -309,14 +390,13 @@ def encode_video_frames(
g: int | None = 2,
crf: int | None = 30,
fast_decode: int = 0,
log_level: int | None = av.logging.ERROR,
log_level: int | None = av.logging.WARNING,
overwrite: bool = False,
preset: int | None = None,
encoder_threads: int | None = None,
) -> None:
"""More info on ffmpeg arguments tuning on `benchmark/video/README.md`"""
# Check encoder availability
if vcodec not in ["h264", "hevc", "libsvtav1"]:
raise ValueError(f"Unsupported video codec: {vcodec}. Supported codecs are: h264, hevc, libsvtav1.")
vcodec = resolve_vcodec(vcodec)
video_path = Path(video_path)
imgs_dir = Path(imgs_dir)
@@ -347,21 +427,22 @@ def encode_video_frames(
width, height = dummy_image.size
# Define video codec options
video_options = {}
if g is not None:
video_options["g"] = str(g)
if crf is not None:
video_options["crf"] = str(crf)
video_options = _get_codec_options(vcodec, g, crf, preset)
if fast_decode:
key = "svtav1-params" if vcodec == "libsvtav1" else "tune"
value = f"fast-decode={fast_decode}" if vcodec == "libsvtav1" else "fastdecode"
video_options[key] = value
if vcodec == "libsvtav1":
video_options["preset"] = str(preset) if preset is not None else "12"
if encoder_threads is not None:
if vcodec == "libsvtav1":
lp_param = f"lp={encoder_threads}"
if "svtav1-params" in video_options:
video_options["svtav1-params"] += f":{lp_param}"
else:
video_options["svtav1-params"] = lp_param
else:
video_options["threads"] = str(encoder_threads)
# Set logging level
if log_level is not None:
@@ -480,6 +561,348 @@ def concatenate_video_files(
Path(tmp_concatenate_path).unlink()
class _CameraEncoderThread(threading.Thread):
"""A thread that encodes video frames streamed via a queue into an MP4 file.
One instance is created per camera per episode. Frames are received as numpy arrays
from the main thread, encoded in real-time using PyAV (which releases the GIL during
encoding), and written to disk. Stats are computed incrementally using
RunningQuantileStats and returned via result_queue.
"""
def __init__(
self,
video_path: Path,
fps: int,
vcodec: str,
pix_fmt: str,
g: int | None,
crf: int | None,
preset: int | None,
frame_queue: queue.Queue,
result_queue: queue.Queue,
stop_event: threading.Event,
encoder_threads: int | None = None,
):
super().__init__(daemon=True)
self.video_path = video_path
self.fps = fps
self.vcodec = vcodec
self.pix_fmt = pix_fmt
self.g = g
self.crf = crf
self.preset = preset
self.frame_queue = frame_queue
self.result_queue = result_queue
self.stop_event = stop_event
self.encoder_threads = encoder_threads
def run(self) -> None:
from lerobot.datasets.compute_stats import RunningQuantileStats, auto_downsample_height_width
container = None
output_stream = None
stats_tracker = RunningQuantileStats()
frame_count = 0
try:
logging.getLogger("libav").setLevel(av.logging.WARNING)
while True:
try:
frame_data = self.frame_queue.get(timeout=1)
except queue.Empty:
if self.stop_event.is_set():
break
continue
if frame_data is None:
# Sentinel: flush and close
break
# Ensure HWC uint8 numpy array
if isinstance(frame_data, np.ndarray):
if frame_data.ndim == 3 and frame_data.shape[0] == 3:
# CHW -> HWC
frame_data = frame_data.transpose(1, 2, 0)
if frame_data.dtype != np.uint8:
frame_data = (frame_data * 255).astype(np.uint8)
# Open container on first frame (to get width/height)
if container is None:
height, width = frame_data.shape[:2]
video_options = _get_codec_options(self.vcodec, self.g, self.crf, self.preset)
if self.encoder_threads is not None:
if self.vcodec == "libsvtav1":
lp_param = f"lp={self.encoder_threads}"
if "svtav1-params" in video_options:
video_options["svtav1-params"] += f":{lp_param}"
else:
video_options["svtav1-params"] = lp_param
else:
video_options["threads"] = str(self.encoder_threads)
Path(self.video_path).parent.mkdir(parents=True, exist_ok=True)
container = av.open(str(self.video_path), "w")
output_stream = container.add_stream(self.vcodec, self.fps, options=video_options)
output_stream.pix_fmt = self.pix_fmt
output_stream.width = width
output_stream.height = height
output_stream.time_base = Fraction(1, self.fps)
# Encode frame with explicit timestamps
pil_img = Image.fromarray(frame_data)
video_frame = av.VideoFrame.from_image(pil_img)
video_frame.pts = frame_count
video_frame.time_base = Fraction(1, self.fps)
packet = output_stream.encode(video_frame)
if packet:
container.mux(packet)
# Update stats with downsampled frame (per-channel stats like compute_episode_stats)
img_chw = frame_data.transpose(2, 0, 1) # HWC -> CHW
img_downsampled = auto_downsample_height_width(img_chw)
# Reshape CHW to (H*W, C) for per-channel stats
channels = img_downsampled.shape[0]
img_for_stats = img_downsampled.transpose(1, 2, 0).reshape(-1, channels)
stats_tracker.update(img_for_stats)
frame_count += 1
# Flush encoder
if output_stream is not None:
packet = output_stream.encode()
if packet:
container.mux(packet)
if container is not None:
container.close()
av.logging.restore_default_callback()
# Get stats and put on result queue
if frame_count >= 2:
stats = stats_tracker.get_statistics()
self.result_queue.put(("ok", stats))
else:
self.result_queue.put(("ok", None))
except Exception as e:
logging.error(f"Encoder thread error: {e}")
if container is not None:
with contextlib.suppress(Exception):
container.close()
self.result_queue.put(("error", str(e)))
class StreamingVideoEncoder:
"""Manages per-camera encoder threads for real-time video encoding during recording.
Instead of writing frames as PNG images and then encoding to MP4 at episode end,
this class streams frames directly to encoder threads, eliminating the
PNG round-trip and making save_episode() near-instant.
Uses threading instead of multiprocessing to avoid the overhead of pickling large
numpy arrays through multiprocessing.Queue. PyAV's encode() releases the GIL,
so encoding runs in parallel with the main recording loop.
"""
def __init__(
self,
fps: int,
vcodec: str = "libsvtav1",
pix_fmt: str = "yuv420p",
g: int | None = 2,
crf: int | None = 30,
preset: int | None = None,
queue_maxsize: int = 30,
encoder_threads: int | None = None,
):
self.fps = fps
self.vcodec = resolve_vcodec(vcodec)
self.pix_fmt = pix_fmt
self.g = g
self.crf = crf
self.preset = preset
self.queue_maxsize = queue_maxsize
self.encoder_threads = encoder_threads
self._frame_queues: dict[str, queue.Queue] = {}
self._result_queues: dict[str, queue.Queue] = {}
self._threads: dict[str, _CameraEncoderThread] = {}
self._stop_events: dict[str, threading.Event] = {}
self._video_paths: dict[str, Path] = {}
self._dropped_frames: dict[str, int] = {}
self._episode_active = False
def start_episode(self, video_keys: list[str], temp_dir: Path) -> None:
"""Start encoder threads for a new episode.
Args:
video_keys: List of video feature keys (e.g. ["observation.images.laptop"])
temp_dir: Base directory for temporary MP4 files
"""
if self._episode_active:
self.cancel_episode()
self._dropped_frames.clear()
for video_key in video_keys:
frame_queue: queue.Queue = queue.Queue(maxsize=self.queue_maxsize)
result_queue: queue.Queue = queue.Queue(maxsize=1)
stop_event = threading.Event()
temp_video_dir = Path(tempfile.mkdtemp(dir=temp_dir))
video_path = temp_video_dir / f"{video_key.replace('/', '_')}_streaming.mp4"
encoder_thread = _CameraEncoderThread(
video_path=video_path,
fps=self.fps,
vcodec=self.vcodec,
pix_fmt=self.pix_fmt,
g=self.g,
crf=self.crf,
preset=self.preset,
frame_queue=frame_queue,
result_queue=result_queue,
stop_event=stop_event,
encoder_threads=self.encoder_threads,
)
encoder_thread.start()
self._frame_queues[video_key] = frame_queue
self._result_queues[video_key] = result_queue
self._threads[video_key] = encoder_thread
self._stop_events[video_key] = stop_event
self._video_paths[video_key] = video_path
self._episode_active = True
def feed_frame(self, video_key: str, image: np.ndarray) -> None:
"""Feed a frame to the encoder for a specific camera.
A copy of the image is made before enqueueing to prevent race conditions
with camera drivers that may reuse buffers. If the encoder queue is full
(encoder can't keep up), the frame is dropped with a warning instead of
crashing the recording session.
Args:
video_key: The video feature key
image: numpy array in (H,W,C) or (C,H,W) format, uint8 or float
Raises:
RuntimeError: If the encoder thread has crashed
"""
if not self._episode_active:
raise RuntimeError("No active episode. Call start_episode() first.")
thread = self._threads[video_key]
if not thread.is_alive():
# Check for error
try:
status, msg = self._result_queues[video_key].get_nowait()
if status == "error":
raise RuntimeError(f"Encoder thread for {video_key} crashed: {msg}")
except queue.Empty:
pass
raise RuntimeError(f"Encoder thread for {video_key} is not alive")
try:
self._frame_queues[video_key].put(image.copy(), timeout=0.1)
except queue.Full:
self._dropped_frames[video_key] = self._dropped_frames.get(video_key, 0) + 1
count = self._dropped_frames[video_key]
# Log periodically to avoid spam (1st, then every 10th)
if count == 1 or count % 10 == 0:
logging.warning(
f"Encoder queue full for {video_key}, dropped {count} frame(s). "
f"Consider using vcodec='auto' for hardware encoding or increasing encoder_queue_maxsize."
)
def finish_episode(self) -> dict[str, tuple[Path, dict | None]]:
"""Finish encoding the current episode.
Sends sentinel values, waits for encoder threads to complete,
and collects results.
Returns:
Dict mapping video_key to (mp4_path, stats_dict_or_None)
"""
if not self._episode_active:
raise RuntimeError("No active episode to finish.")
results = {}
# Report dropped frames
for video_key, count in self._dropped_frames.items():
if count > 0:
logging.warning(f"Episode finished with {count} dropped frame(s) for {video_key}.")
# Send sentinel to all queues
for video_key in self._frame_queues:
self._frame_queues[video_key].put(None)
# Wait for all threads and collect results
for video_key in self._threads:
self._threads[video_key].join(timeout=120)
if self._threads[video_key].is_alive():
logging.error(f"Encoder thread for {video_key} did not finish in time")
self._stop_events[video_key].set()
self._threads[video_key].join(timeout=5)
results[video_key] = (self._video_paths[video_key], None)
continue
try:
status, data = self._result_queues[video_key].get(timeout=5)
if status == "error":
raise RuntimeError(f"Encoder thread for {video_key} failed: {data}")
results[video_key] = (self._video_paths[video_key], data)
except queue.Empty:
logging.error(f"No result from encoder thread for {video_key}")
results[video_key] = (self._video_paths[video_key], None)
self._cleanup()
self._episode_active = False
return results
def cancel_episode(self) -> None:
"""Cancel the current episode, stopping encoder threads and cleaning up."""
if not self._episode_active:
return
# Signal all threads to stop
for video_key in self._stop_events:
self._stop_events[video_key].set()
# Wait for threads to finish
for video_key in self._threads:
self._threads[video_key].join(timeout=5)
# Clean up temp MP4 files
video_path = self._video_paths.get(video_key)
if video_path is not None and video_path.exists():
shutil.rmtree(str(video_path.parent), ignore_errors=True)
self._cleanup()
self._episode_active = False
def close(self) -> None:
"""Close the encoder, canceling any in-progress episode."""
if self._episode_active:
self.cancel_episode()
def _cleanup(self) -> None:
"""Clean up queues and thread tracking dicts."""
for q in self._frame_queues.values():
with contextlib.suppress(Exception):
while not q.empty():
q.get_nowait()
self._frame_queues.clear()
self._result_queues.clear()
self._threads.clear()
self._stop_events.clear()
self._video_paths.clear()
@dataclass
class VideoFrame:
# TODO(rcadene, lhoestq): move to Hugging Face `datasets` repo
@@ -514,7 +937,7 @@ with warnings.catch_warnings():
def get_audio_info(video_path: Path | str) -> dict:
# Set logging level
logging.getLogger("libav").setLevel(av.logging.ERROR)
logging.getLogger("libav").setLevel(av.logging.WARNING)
# Getting audio stream information
audio_info = {}
@@ -546,7 +969,7 @@ def get_audio_info(video_path: Path | str) -> dict:
def get_video_info(video_path: Path | str) -> dict:
# Set logging level
logging.getLogger("libav").setLevel(av.logging.ERROR)
logging.getLogger("libav").setLevel(av.logging.WARNING)
# Getting video stream information
video_info = {}
@@ -632,8 +1055,15 @@ class VideoEncodingManager:
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# Handle any remaining episodes that haven't been batch encoded
if self.dataset.episodes_since_last_encoding > 0:
streaming_encoder = getattr(self.dataset, "_streaming_encoder", None)
if streaming_encoder is not None:
# Handle streaming encoder cleanup
if exc_type is not None:
streaming_encoder.cancel_episode()
streaming_encoder.close()
elif self.dataset.episodes_since_last_encoding > 0:
# Handle any remaining episodes that haven't been batch encoded
if exc_type is not None:
logging.info("Exception occurred. Encoding remaining episodes before exit...")
else:
@@ -650,8 +1080,8 @@ class VideoEncodingManager:
# Finalize the dataset to properly close all writers
self.dataset.finalize()
# Clean up episode images if recording was interrupted
if exc_type is not None:
# Clean up episode images if recording was interrupted (only for non-streaming mode)
if exc_type is not None and streaming_encoder is None:
interrupted_episode_index = self.dataset.num_episodes
for key in self.dataset.meta.video_keys:
img_dir = self.dataset._get_image_file_path(
@@ -665,14 +1095,12 @@ class VideoEncodingManager:
# Clean up any remaining images directory if it's empty
img_dir = self.dataset.root / "images"
# Check for any remaining PNG files
png_files = list(img_dir.rglob("*.png"))
if len(png_files) == 0:
# Only remove the images directory if no PNG files remain
if img_dir.exists():
if img_dir.exists():
png_files = list(img_dir.rglob("*.png"))
if len(png_files) == 0:
shutil.rmtree(img_dir)
logging.debug("Cleaned up empty images directory")
else:
logging.debug(f"Images directory is not empty, containing {len(png_files)} PNG files")
else:
logging.debug(f"Images directory is not empty, containing {len(png_files)} PNG files")
return False # Don't suppress the original exception
+8 -2
View File
@@ -105,6 +105,7 @@ def make_env(
use_async_envs: bool = False,
hub_cache_dir: str | None = None,
trust_remote_code: bool = False,
**kwargs,
) -> dict[str, dict[int, gym.vector.VectorEnv]]:
"""Makes a gym vector environment according to the config or Hub reference.
@@ -118,6 +119,9 @@ def make_env(
hub_cache_dir (str | None): Optional cache path for downloaded hub files.
trust_remote_code (bool): **Explicit consent** to execute remote code from the Hub.
Default False must be set to True to import/exec hub `env.py`.
**kwargs: Additional keyword arguments passed to the hub environment's `make_env` function.
Useful for passing custom configurations like `config_path`, `config_overrides`, etc.
Raises:
ValueError: if n_envs < 1
ModuleNotFoundError: If the requested env package is not installed
@@ -149,9 +153,11 @@ def make_env(
# import and surface clear import errors
module = _import_hub_module(local_file, repo_id)
# call the hub-provided make_env
# call the hub-provided make_env with any additional kwargs
env_cfg = None if isinstance(cfg, str) else cfg
raw_result = _call_make_env(module, n_envs=n_envs, use_async_envs=use_async_envs, cfg=env_cfg)
raw_result = _call_make_env(
module, n_envs=n_envs, use_async_envs=use_async_envs, cfg=env_cfg, **kwargs
)
# normalize the return into {suite: {task_id: vec_env}}
return _normalize_hub_result(raw_result)
+7 -2
View File
@@ -112,6 +112,7 @@ class LiberoEnv(gym.Env):
visualization_height: int = 480,
init_states: bool = True,
episode_index: int = 0,
n_envs: int = 1,
camera_name_mapping: dict[str, str] | None = None,
num_steps_wait: int = 10,
control_mode: str = "relative",
@@ -145,7 +146,9 @@ class LiberoEnv(gym.Env):
self.episode_length = episode_length
# Load once and keep
self._init_states = get_task_init_states(task_suite, self.task_id) if self.init_states else None
self._init_state_id = self.episode_index # tie each sub-env to a fixed init state
self._reset_stride = n_envs # when performing a reset, append `_reset_stride` to `init_state_id`.
self.init_state_id = self.episode_index # tie each sub-env to a fixed init state
self._env = self._make_envs_task(task_suite, self.task_id)
default_steps = 500
@@ -295,7 +298,8 @@ class LiberoEnv(gym.Env):
self._env.seed(seed)
raw_obs = self._env.reset()
if self.init_states and self._init_states is not None:
raw_obs = self._env.set_init_state(self._init_states[self._init_state_id])
raw_obs = self._env.set_init_state(self._init_states[self.init_state_id % len(self._init_states)])
self.init_state_id += self._reset_stride # Change init_state_id when reset
# After reset, objects may be unstable (slightly floating, intersecting, etc.).
# Step the simulator with a no-op action for a few frames so everything settles.
@@ -373,6 +377,7 @@ def _make_env_fns(
init_states=init_states,
episode_length=episode_length,
episode_index=episode_index,
n_envs=n_envs,
control_mode=control_mode,
**local_kwargs,
)
+12 -5
View File
@@ -311,20 +311,27 @@ def _import_hub_module(local_file: str, repo_id: str) -> Any:
return module
def _call_make_env(module: Any, n_envs: int, use_async_envs: bool, cfg: EnvConfig | None) -> Any:
def _call_make_env(module: Any, n_envs: int, use_async_envs: bool, cfg: EnvConfig | None, **kwargs) -> Any:
"""
Ensure module exposes make_env and call it.
Ensure module exposes make_env and call it with any additional kwargs.
Args:
module: The imported hub module containing make_env.
n_envs: Number of parallel environments.
use_async_envs: Whether to use AsyncVectorEnv or SyncVectorEnv.
**kwargs: Additional keyword arguments to pass to the hub's make_env function.
Common examples include config_path, config_overrides, etc.
"""
if not hasattr(module, "make_env"):
raise AttributeError(
f"The hub module {getattr(module, '__name__', 'hub_module')} must expose `make_env(n_envs=int, use_async_envs=bool)`."
f"The hub module {getattr(module, '__name__', 'hub_module')} must expose `make_env(n_envs=int, use_async_envs=bool, **kwargs)`."
)
entry_fn = module.make_env
# Only pass cfg if it's not None (i.e., when an EnvConfig was provided, not a string hub ID)
if cfg is not None:
return entry_fn(n_envs=n_envs, use_async_envs=use_async_envs, cfg=cfg)
return entry_fn(n_envs=n_envs, use_async_envs=use_async_envs, cfg=cfg, **kwargs)
else:
return entry_fn(n_envs=n_envs, use_async_envs=use_async_envs)
return entry_fn(n_envs=n_envs, use_async_envs=use_async_envs, **kwargs)
def _normalize_hub_result(result: Any) -> dict[str, dict[int, gym.vector.VectorEnv]]:
+6 -4
View File
@@ -221,7 +221,7 @@ class RangeFinderGUI:
self.bus = bus
self.groups = groups if groups is not None else {"all": list(bus.motors)}
self.group_names = list(groups)
self.group_names = list(self.groups)
self.current_group = self.group_names[0]
if not bus.is_connected:
@@ -230,18 +230,20 @@ class RangeFinderGUI:
self.calibration = bus.read_calibration()
self.res_table = bus.model_resolution_table
self.present_cache = {
m: bus.read("Present_Position", m, normalize=False) for motors in groups.values() for m in motors
m: bus.read("Present_Position", m, normalize=False)
for motors in self.groups.values()
for m in motors
}
pygame.init()
self.font = pygame.font.Font(None, FONT_SIZE)
label_pad = max(self.font.size(m)[0] for ms in groups.values() for m in ms)
label_pad = max(self.font.size(m)[0] for ms in self.groups.values() for m in ms)
self.label_pad = label_pad
width = 40 + label_pad + BAR_LEN + 6 + BTN_W + 10 + SAVE_W + 10
self.controls_bottom = 10 + SAVE_H
self.base_y = self.controls_bottom + TOP_GAP
height = self.base_y + PADDING_Y * len(groups[self.current_group]) + 40
height = self.base_y + PADDING_Y * len(self.groups[self.current_group]) + 40
self.screen = pygame.display.set_mode((width, height))
pygame.display.set_caption("Motors range finder")
+41 -15
View File
@@ -23,6 +23,7 @@ from copy import deepcopy
from functools import cached_property
from typing import TYPE_CHECKING, Any, TypedDict
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from lerobot.utils.import_utils import _can_available
if TYPE_CHECKING or _can_available:
@@ -36,7 +37,6 @@ else:
import numpy as np
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.utils.robot_utils import precise_sleep
from lerobot.utils.utils import enter_pressed, move_cursor_up
@@ -155,6 +155,7 @@ class DamiaoMotorsBus(MotorsBusBase):
"""Check if the CAN bus is connected."""
return self._is_connected and self.canbus is not None
@check_if_already_connected
def connect(self, handshake: bool = True) -> None:
"""
Open the CAN bus and initialize communication.
@@ -162,10 +163,6 @@ class DamiaoMotorsBus(MotorsBusBase):
Args:
handshake: If True, ping all motors to verify they're present
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(
f"{self.__class__.__name__}('{self.port}') is already connected."
)
try:
# Auto-detect interface type based on port name
@@ -211,6 +208,9 @@ class DamiaoMotorsBus(MotorsBusBase):
logger.info("Starting handshake with motors...")
# Drain any pending messages
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
while self.canbus.recv(timeout=0.01):
pass
@@ -246,6 +246,7 @@ class DamiaoMotorsBus(MotorsBusBase):
)
logger.info("Handshake successful. All motors ready.")
@check_if_not_connected
def disconnect(self, disable_torque: bool = True) -> None:
"""
Close the CAN bus connection.
@@ -253,8 +254,6 @@ class DamiaoMotorsBus(MotorsBusBase):
Args:
disable_torque: If True, disable torque on all motors before disconnecting
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self.__class__.__name__}('{self.port}') is not connected.")
if disable_torque:
try:
@@ -283,6 +282,10 @@ class DamiaoMotorsBus(MotorsBusBase):
recv_id = self._get_motor_recv_id(motor)
data = [0xFF] * 7 + [command_byte]
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd)
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
self.canbus.send(msg)
if msg := self._recv_motor_response(expected_recv_id=recv_id):
self._process_response(motor_name, msg)
@@ -341,6 +344,10 @@ class DamiaoMotorsBus(MotorsBusBase):
recv_id = self._get_motor_recv_id(motor)
data = [motor_id & 0xFF, (motor_id >> 8) & 0xFF, CAN_CMD_REFRESH, 0, 0, 0, 0, 0]
msg = can.Message(arbitration_id=CAN_PARAM_ID, data=data, is_extended_id=False, is_fd=self.use_can_fd)
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
self.canbus.send(msg)
return self._recv_motor_response(expected_recv_id=recv_id)
@@ -356,6 +363,10 @@ class DamiaoMotorsBus(MotorsBusBase):
Returns:
CAN message if received, None otherwise
"""
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
try:
start_time = time.time()
messages_seen = []
@@ -394,10 +405,13 @@ class DamiaoMotorsBus(MotorsBusBase):
Returns:
Dictionary mapping recv_id to CAN message
"""
responses = {}
responses: dict[int, can.Message] = {}
expected_set = set(expected_recv_ids)
start_time = time.time()
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
try:
while len(responses) < len(expected_recv_ids) and (time.time() - start_time) < timeout:
# 100us poll timeout
@@ -461,6 +475,9 @@ class DamiaoMotorsBus(MotorsBusBase):
motor_name = self._get_motor_name(motor)
motor_type = self._motor_types[motor_name]
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
data = self._encode_mit_packet(motor_type, kp, kd, position_degrees, velocity_deg_per_sec, torque)
msg = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False, is_fd=self.use_can_fd)
self.canbus.send(msg)
@@ -488,6 +505,9 @@ class DamiaoMotorsBus(MotorsBusBase):
recv_id_to_motor: dict[int, str] = {}
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
# Step 1: Send all MIT control commands
for motor, (kp, kd, position_degrees, velocity_deg_per_sec, torque) in commands.items():
motor_id = self._get_motor_id(motor)
@@ -562,10 +582,9 @@ class DamiaoMotorsBus(MotorsBusBase):
except Exception as e:
logger.warning(f"Failed to decode response from {motor}: {e}")
@check_if_not_connected
def read(self, data_name: str, motor: str) -> Value:
"""Read a value from a single motor. Positions are always in degrees."""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Refresh motor to get latest state
msg = self._refresh_motor(motor)
@@ -595,6 +614,7 @@ class DamiaoMotorsBus(MotorsBusBase):
raise ValueError(f"Unknown data_name: {data_name}")
return mapping[data_name]
@check_if_not_connected
def write(
self,
data_name: str,
@@ -605,8 +625,6 @@ class DamiaoMotorsBus(MotorsBusBase):
Write a value to a single motor. Positions are always in degrees.
Can write 'Goal_Position', 'Kp', or 'Kd'.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if data_name in ("Kp", "Kd"):
self._gains[motor][data_name.lower()] = float(value)
@@ -656,6 +674,10 @@ class DamiaoMotorsBus(MotorsBusBase):
def _batch_refresh(self, motors: list[str]) -> None:
"""Internal helper to refresh a list of motors and update cache."""
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
# Send refresh commands
for motor in motors:
motor_id = self._get_motor_id(motor)
@@ -678,10 +700,12 @@ class DamiaoMotorsBus(MotorsBusBase):
else:
logger.warning(f"Packet drop: {motor} (ID: 0x{recv_id:02X}). Using last known state.")
def sync_write(self, data_name: str, values: Value | dict[str, Value]) -> None:
@check_if_not_connected
def sync_write(self, data_name: str, values: dict[str, Value]) -> None:
"""
Write values to multiple motors simultaneously. Positions are always in degrees.
"""
if data_name in ("Kp", "Kd"):
key = data_name.lower()
for motor, val in values.items():
@@ -690,6 +714,8 @@ class DamiaoMotorsBus(MotorsBusBase):
elif data_name == "Goal_Position":
# Step 1: Send all MIT control commands
recv_id_to_motor: dict[int, str] = {}
if self.canbus is None:
raise RuntimeError("CAN bus is not initialized.")
for motor, value_degrees in values.items():
motor_id = self._get_motor_id(motor)
motor_name = self._get_motor_name(motor)
@@ -732,9 +758,9 @@ class DamiaoMotorsBus(MotorsBusBase):
def record_ranges_of_motion(
self,
motors: NameOrID | list[NameOrID] | None = None,
motors: str | list[str] | None = None,
display_values: bool = True,
) -> tuple[dict[NameOrID, Value], dict[NameOrID, Value]]:
) -> tuple[dict[str, Value], dict[str, Value]]:
"""
Interactively record the min/max values of each motor in degrees.
+8 -8
View File
@@ -181,10 +181,10 @@ class DynamixelMotorsBus(SerialMotorsBus):
for motor, m in self.motors.items():
calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=drive_modes[motor],
homing_offset=offsets[motor],
range_min=mins[motor],
range_max=maxes[motor],
drive_mode=int(drive_modes[motor]),
homing_offset=int(offsets[motor]),
range_min=int(mins[motor]),
range_max=int(maxes[motor]),
)
return calibration
@@ -198,7 +198,7 @@ class DynamixelMotorsBus(SerialMotorsBus):
if cache:
self.calibration = calibration_dict
def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
for motor in self._get_motors_list(motors):
self.write("Torque_Enable", motor, TorqueMode.DISABLED.value, num_retry=num_retry)
@@ -206,7 +206,7 @@ class DynamixelMotorsBus(SerialMotorsBus):
addr, length = get_address(self.model_ctrl_table, model, "Torque_Enable")
self._write(addr, length, motor, TorqueMode.DISABLED.value, num_retry=num_retry)
def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
def enable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
for motor in self._get_motors_list(motors):
self.write("Torque_Enable", motor, TorqueMode.ENABLED.value, num_retry=num_retry)
@@ -235,7 +235,7 @@ class DynamixelMotorsBus(SerialMotorsBus):
On Dynamixel Motors:
Present_Position = Actual_Position + Homing_Offset
"""
half_turn_homings = {}
half_turn_homings: dict[NameOrID, Value] = {}
for motor, pos in positions.items():
model = self._get_motor_model(motor)
max_res = self.model_resolution_table[model] - 1
@@ -258,6 +258,6 @@ class DynamixelMotorsBus(SerialMotorsBus):
if raise_on_error:
raise ConnectionError(self.packet_handler.getTxRxResult(comm))
return
return None
return {id_: data[0] for id_, data in data_list.items()}
+9 -9
View File
@@ -126,7 +126,7 @@ class FeetechMotorsBus(SerialMotorsBus):
self.port_handler = scs.PortHandler(self.port)
# HACK: monkeypatch
self.port_handler.setPacketTimeout = patch_setPacketTimeout.__get__(
self.port_handler.setPacketTimeout = patch_setPacketTimeout.__get__( # type: ignore[method-assign]
self.port_handler, scs.PortHandler
)
self.packet_handler = scs.PacketHandler(protocol_version)
@@ -262,9 +262,9 @@ class FeetechMotorsBus(SerialMotorsBus):
calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=0,
homing_offset=offsets[motor],
range_min=mins[motor],
range_max=maxes[motor],
homing_offset=int(offsets[motor]),
range_min=int(mins[motor]),
range_max=int(maxes[motor]),
)
return calibration
@@ -284,7 +284,7 @@ class FeetechMotorsBus(SerialMotorsBus):
On Feetech Motors:
Present_Position = Actual_Position - Homing_Offset
"""
half_turn_homings = {}
half_turn_homings: dict[NameOrID, Value] = {}
for motor, pos in positions.items():
model = self._get_motor_model(motor)
max_res = self.model_resolution_table[model] - 1
@@ -292,7 +292,7 @@ class FeetechMotorsBus(SerialMotorsBus):
return half_turn_homings
def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
for motor in self._get_motors_list(motors):
self.write("Torque_Enable", motor, TorqueMode.DISABLED.value, num_retry=num_retry)
self.write("Lock", motor, 0, num_retry=num_retry)
@@ -303,7 +303,7 @@ class FeetechMotorsBus(SerialMotorsBus):
addr, length = get_address(self.model_ctrl_table, model, "Lock")
self._write(addr, length, motor, 0, num_retry=num_retry)
def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
def enable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
for motor in self._get_motors_list(motors):
self.write("Torque_Enable", motor, TorqueMode.ENABLED.value, num_retry=num_retry)
self.write("Lock", motor, 1, num_retry=num_retry)
@@ -334,7 +334,7 @@ class FeetechMotorsBus(SerialMotorsBus):
def _broadcast_ping(self) -> tuple[dict[int, int], int]:
import scservo_sdk as scs
data_list = {}
data_list: dict[int, int] = {}
status_length = 6
@@ -414,7 +414,7 @@ class FeetechMotorsBus(SerialMotorsBus):
if not self._is_comm_success(comm):
if raise_on_error:
raise ConnectionError(self.packet_handler.getTxRxResult(comm))
return
return None
ids_errors = {id_: status for id_, status in ids_status.items() if self._is_error(status)}
if ids_errors:
+93 -90
View File
@@ -23,6 +23,7 @@ from __future__ import annotations
import abc
import logging
from collections.abc import Sequence
from contextlib import contextmanager
from dataclasses import dataclass
from enum import Enum
@@ -93,7 +94,7 @@ class MotorsBusBase(abc.ABC):
pass
@abc.abstractmethod
def sync_write(self, data_name: str, values: Value | dict[str, Value]) -> None:
def sync_write(self, data_name: str, values: dict[str, Value]) -> None:
"""Write values to multiple motors."""
pass
@@ -179,15 +180,16 @@ class Motor:
class PortHandler(Protocol):
def __init__(self, port_name):
self.is_open: bool
self.baudrate: int
self.packet_start_time: float
self.packet_timeout: float
self.tx_time_per_byte: float
self.is_using: bool
self.port_name: str
self.ser: serial.Serial
is_open: bool
baudrate: int
packet_start_time: float
packet_timeout: float
tx_time_per_byte: float
is_using: bool
port_name: str
ser: serial.Serial
def __init__(self, port_name: str) -> None: ...
def openPort(self): ...
def closePort(self): ...
@@ -240,19 +242,22 @@ class PacketHandler(Protocol):
def regWriteTxRx(self, port, id, address, length, data): ...
def syncReadTx(self, port, start_address, data_length, param, param_length): ...
def syncWriteTxOnly(self, port, start_address, data_length, param, param_length): ...
def broadcastPing(self, port): ...
class GroupSyncRead(Protocol):
def __init__(self, port, ph, start_address, data_length):
self.port: str
self.ph: PortHandler
self.start_address: int
self.data_length: int
self.last_result: bool
self.is_param_changed: bool
self.param: list
self.data_dict: dict
port: str
ph: PortHandler
start_address: int
data_length: int
last_result: bool
is_param_changed: bool
param: list
data_dict: dict
def __init__(
self, port: PortHandler, ph: PacketHandler, start_address: int, data_length: int
) -> None: ...
def makeParam(self): ...
def addParam(self, id): ...
def removeParam(self, id): ...
@@ -265,15 +270,17 @@ class GroupSyncRead(Protocol):
class GroupSyncWrite(Protocol):
def __init__(self, port, ph, start_address, data_length):
self.port: str
self.ph: PortHandler
self.start_address: int
self.data_length: int
self.is_param_changed: bool
self.param: list
self.data_dict: dict
port: str
ph: PortHandler
start_address: int
data_length: int
is_param_changed: bool
param: list
data_dict: dict
def __init__(
self, port: PortHandler, ph: PacketHandler, start_address: int, data_length: int
) -> None: ...
def makeParam(self): ...
def addParam(self, id, data): ...
def removeParam(self, id): ...
@@ -400,7 +407,7 @@ class SerialMotorsBus(MotorsBusBase):
else:
raise TypeError(f"'{motor}' should be int, str.")
def _get_motor_model(self, motor: NameOrID) -> int:
def _get_motor_model(self, motor: NameOrID) -> str:
if isinstance(motor, str):
return self.motors[motor].model
elif isinstance(motor, int):
@@ -408,17 +415,19 @@ class SerialMotorsBus(MotorsBusBase):
else:
raise TypeError(f"'{motor}' should be int, str.")
def _get_motors_list(self, motors: str | list[str] | None) -> list[str]:
def _get_motors_list(self, motors: NameOrID | Sequence[NameOrID] | None) -> list[str]:
if motors is None:
return list(self.motors)
elif isinstance(motors, str):
return [motors]
elif isinstance(motors, list):
return motors.copy()
elif isinstance(motors, int):
return [self._id_to_name(motors)]
elif isinstance(motors, Sequence):
return [m if isinstance(m, str) else self._id_to_name(m) for m in motors]
else:
raise TypeError(motors)
def _get_ids_values_dict(self, values: Value | dict[str, Value] | None) -> list[str]:
def _get_ids_values_dict(self, values: Value | dict[str, Value] | None) -> dict[int, Value]:
if isinstance(values, (int | float)):
return dict.fromkeys(self.ids, values)
elif isinstance(values, dict):
@@ -640,18 +649,19 @@ class SerialMotorsBus(MotorsBusBase):
pass
@abc.abstractmethod
def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
def enable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
"""Enable torque on selected motors.
Args:
motor (int): Same semantics as :pymeth:`disable_torque`. Defaults to `None`.
motors (int | str | list[str] | None, optional): Same semantics as :pymeth:`disable_torque`.
Defaults to `None`.
num_retry (int, optional): Number of additional retry attempts on communication failure.
Defaults to 0.
"""
pass
@contextmanager
def torque_disabled(self, motors: int | str | list[str] | None = None):
def torque_disabled(self, motors: str | list[str] | None = None):
"""Context-manager that guarantees torque is re-enabled.
This helper is useful to temporarily disable torque when configuring motors.
@@ -728,24 +738,19 @@ class SerialMotorsBus(MotorsBusBase):
"""
pass
def reset_calibration(self, motors: NameOrID | list[NameOrID] | None = None) -> None:
def reset_calibration(self, motors: NameOrID | Sequence[NameOrID] | None = None) -> None:
"""Restore factory calibration for the selected motors.
Homing offset is set to ``0`` and min/max position limits are set to the full usable range.
The in-memory :pyattr:`calibration` is cleared.
Args:
motors (NameOrID | list[NameOrID] | None, optional): Selection of motors. `None` (default)
motors (NameOrID | Sequence[NameOrID] | None, optional): Selection of motors. `None` (default)
resets every motor.
"""
if motors is None:
motors = list(self.motors)
elif isinstance(motors, (str | int)):
motors = [motors]
elif not isinstance(motors, list):
raise TypeError(motors)
motor_names = self._get_motors_list(motors)
for motor in motors:
for motor in motor_names:
model = self._get_motor_model(motor)
max_res = self.model_resolution_table[model] - 1
self.write("Homing_Offset", motor, 0, normalize=False)
@@ -754,7 +759,9 @@ class SerialMotorsBus(MotorsBusBase):
self.calibration = {}
def set_half_turn_homings(self, motors: NameOrID | list[NameOrID] | None = None) -> dict[NameOrID, Value]:
def set_half_turn_homings(
self, motors: NameOrID | Sequence[NameOrID] | None = None
) -> dict[NameOrID, Value]:
"""Centre each motor range around its current position.
The function computes and writes a homing offset such that the present position becomes exactly one
@@ -764,17 +771,12 @@ class SerialMotorsBus(MotorsBusBase):
motors (NameOrID | list[NameOrID] | None, optional): Motors to adjust. Defaults to all motors (`None`).
Returns:
dict[NameOrID, Value]: Mapping *motor written homing offset*.
dict[str, Value]: Mapping *motor name written homing offset*.
"""
if motors is None:
motors = list(self.motors)
elif isinstance(motors, (str | int)):
motors = [motors]
elif not isinstance(motors, list):
raise TypeError(motors)
motor_names = self._get_motors_list(motors)
self.reset_calibration(motors)
actual_positions = self.sync_read("Present_Position", motors, normalize=False)
self.reset_calibration(motor_names)
actual_positions = self.sync_read("Present_Position", motor_names, normalize=False)
homing_offsets = self._get_half_turn_homings(actual_positions)
for motor, offset in homing_offsets.items():
self.write("Homing_Offset", motor, offset)
@@ -786,8 +788,8 @@ class SerialMotorsBus(MotorsBusBase):
pass
def record_ranges_of_motion(
self, motors: NameOrID | list[NameOrID] | None = None, display_values: bool = True
) -> tuple[dict[NameOrID, Value], dict[NameOrID, Value]]:
self, motors: NameOrID | Sequence[NameOrID] | None = None, display_values: bool = True
) -> tuple[dict[str, Value], dict[str, Value]]:
"""Interactively record the min/max encoder values of each motor.
Move the joints by hand (with torque disabled) while the method streams live positions. Press
@@ -799,30 +801,25 @@ class SerialMotorsBus(MotorsBusBase):
display_values (bool, optional): When `True` (default) a live table is printed to the console.
Returns:
tuple[dict[NameOrID, Value], dict[NameOrID, Value]]: Two dictionaries *mins* and *maxes* with the
tuple[dict[str, Value], dict[str, Value]]: Two dictionaries *mins* and *maxes* with the
extreme values observed for each motor.
"""
if motors is None:
motors = list(self.motors)
elif isinstance(motors, (str | int)):
motors = [motors]
elif not isinstance(motors, list):
raise TypeError(motors)
motor_names = self._get_motors_list(motors)
start_positions = self.sync_read("Present_Position", motors, normalize=False)
start_positions = self.sync_read("Present_Position", motor_names, normalize=False)
mins = start_positions.copy()
maxes = start_positions.copy()
user_pressed_enter = False
while not user_pressed_enter:
positions = self.sync_read("Present_Position", motors, normalize=False)
positions = self.sync_read("Present_Position", motor_names, normalize=False)
mins = {motor: min(positions[motor], min_) for motor, min_ in mins.items()}
maxes = {motor: max(positions[motor], max_) for motor, max_ in maxes.items()}
if display_values:
print("\n-------------------------------------------")
print(f"{'NAME':<15} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}")
for motor in motors:
for motor in motor_names:
print(f"{motor:<15} | {mins[motor]:>6} | {positions[motor]:>6} | {maxes[motor]:>6}")
if enter_pressed():
@@ -830,9 +827,9 @@ class SerialMotorsBus(MotorsBusBase):
if display_values and not user_pressed_enter:
# Move cursor up to overwrite the previous output
move_cursor_up(len(motors) + 3)
move_cursor_up(len(motor_names) + 3)
same_min_max = [motor for motor in motors if mins[motor] == maxes[motor]]
same_min_max = [motor for motor in motor_names if mins[motor] == maxes[motor]]
if same_min_max:
raise ValueError(f"Some motors have the same min and max values:\n{pformat(same_min_max)}")
@@ -955,12 +952,12 @@ class SerialMotorsBus(MotorsBusBase):
if raise_on_error:
raise ConnectionError(self.packet_handler.getTxRxResult(comm))
else:
return
return None
if self._is_error(error):
if raise_on_error:
raise RuntimeError(self.packet_handler.getRxPacketError(error))
else:
return
return None
return model_number
@@ -1007,12 +1004,13 @@ class SerialMotorsBus(MotorsBusBase):
err_msg = f"Failed to read '{data_name}' on {id_=} after {num_retry + 1} tries."
value, _, _ = self._read(addr, length, id_, num_retry=num_retry, raise_on_error=True, err_msg=err_msg)
id_value = self._decode_sign(data_name, {id_: value})
decoded = self._decode_sign(data_name, {id_: value})
if normalize and data_name in self.normalized_data:
id_value = self._normalize(id_value)
normalized = self._normalize(decoded)
return normalized[id_]
return id_value[id_]
return decoded[id_]
def _read(
self,
@@ -1023,7 +1021,7 @@ class SerialMotorsBus(MotorsBusBase):
num_retry: int = 0,
raise_on_error: bool = True,
err_msg: str = "",
) -> tuple[int, int]:
) -> tuple[int, int, int]:
if length == 1:
read_fn = self.packet_handler.read1ByteTxRx
elif length == 2:
@@ -1073,13 +1071,14 @@ class SerialMotorsBus(MotorsBusBase):
model = self.motors[motor].model
addr, length = get_address(self.model_ctrl_table, model, data_name)
int_value = int(value)
if normalize and data_name in self.normalized_data:
value = self._unnormalize({id_: value})[id_]
int_value = self._unnormalize({id_: value})[id_]
value = self._encode_sign(data_name, {id_: value})[id_]
int_value = self._encode_sign(data_name, {id_: int_value})[id_]
err_msg = f"Failed to write '{data_name}' on {id_=} with '{value}' after {num_retry + 1} tries."
self._write(addr, length, id_, value, num_retry=num_retry, raise_on_error=True, err_msg=err_msg)
err_msg = f"Failed to write '{data_name}' on {id_=} with '{int_value}' after {num_retry + 1} tries."
self._write(addr, length, id_, int_value, num_retry=num_retry, raise_on_error=True, err_msg=err_msg)
def _write(
self,
@@ -1113,7 +1112,7 @@ class SerialMotorsBus(MotorsBusBase):
def sync_read(
self,
data_name: str,
motors: str | list[str] | None = None,
motors: NameOrID | Sequence[NameOrID] | None = None,
*,
normalize: bool = True,
num_retry: int = 0,
@@ -1122,7 +1121,7 @@ class SerialMotorsBus(MotorsBusBase):
Args:
data_name (str): Register name.
motors (str | list[str] | None, optional): Motors to query. `None` (default) reads every motor.
motors (NameOrID | Sequence[NameOrID] | None, optional): Motors to query. `None` (default) reads every motor.
normalize (bool, optional): Normalisation flag. Defaults to `True`.
num_retry (int, optional): Retry attempts. Defaults to `0`.
@@ -1143,16 +1142,17 @@ class SerialMotorsBus(MotorsBusBase):
addr, length = get_address(self.model_ctrl_table, model, data_name)
err_msg = f"Failed to sync read '{data_name}' on {ids=} after {num_retry + 1} tries."
ids_values, _ = self._sync_read(
raw_ids_values, _ = self._sync_read(
addr, length, ids, num_retry=num_retry, raise_on_error=True, err_msg=err_msg
)
ids_values = self._decode_sign(data_name, ids_values)
decoded = self._decode_sign(data_name, raw_ids_values)
if normalize and data_name in self.normalized_data:
ids_values = self._normalize(ids_values)
normalized = self._normalize(decoded)
return {self._id_to_name(id_): value for id_, value in normalized.items()}
return {self._id_to_name(id_): value for id_, value in ids_values.items()}
return {self._id_to_name(id_): value for id_, value in decoded.items()}
def _sync_read(
self,
@@ -1224,21 +1224,24 @@ class SerialMotorsBus(MotorsBusBase):
num_retry (int, optional): Retry attempts. Defaults to `0`.
"""
ids_values = self._get_ids_values_dict(values)
models = [self._id_to_model(id_) for id_ in ids_values]
raw_ids_values = self._get_ids_values_dict(values)
models = [self._id_to_model(id_) for id_ in raw_ids_values]
if self._has_different_ctrl_tables:
assert_same_address(self.model_ctrl_table, models, data_name)
model = next(iter(models))
addr, length = get_address(self.model_ctrl_table, model, data_name)
int_ids_values = {id_: int(val) for id_, val in raw_ids_values.items()}
if normalize and data_name in self.normalized_data:
ids_values = self._unnormalize(ids_values)
int_ids_values = self._unnormalize(raw_ids_values)
ids_values = self._encode_sign(data_name, ids_values)
int_ids_values = self._encode_sign(data_name, int_ids_values)
err_msg = f"Failed to sync write '{data_name}' with {ids_values=} after {num_retry + 1} tries."
self._sync_write(addr, length, ids_values, num_retry=num_retry, raise_on_error=True, err_msg=err_msg)
err_msg = f"Failed to sync write '{data_name}' with ids_values={int_ids_values} after {num_retry + 1} tries."
self._sync_write(
addr, length, int_ids_values, num_retry=num_retry, raise_on_error=True, err_msg=err_msg
)
def _sync_write(
self,
@@ -1,6 +1,6 @@
#!/usr/bin/env python
# Copyright 2025 Physical Intelligence and The HuggingFace Inc. team. All rights reserved.
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,8 +14,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .configuration_pi05 import PI05FullConfig
from .modeling_pi05 import PI05FullPolicy
from .processor_pi05 import make_pi05_full_pre_post_processors
__all__ = ["PI05FullConfig", "PI05FullPolicy", "make_pi05_full_pre_post_processors"]
from .robstride import RobstrideMotorsBus
from .tables import *
File diff suppressed because it is too large Load Diff
+120
View File
@@ -0,0 +1,120 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Configuration tables for Damiao motors."""
from enum import IntEnum
# Motor type definitions
class MotorType(IntEnum):
O0 = 0
O1 = 1
O2 = 2
O3 = 3
O4 = 4
O5 = 5
ELO5 = 6
O6 = 7
class CommMode(IntEnum):
PrivateProtocole = 0
CANopen = 1
MIT = 2
# Control modes
class ControlMode(IntEnum):
MIT = 0
POS_VEL = 1
VEL = 2
# Motor limit parameters [PMAX, VMAX, TMAX]
# PMAX: Maximum position (rad)
# VMAX: Maximum velocity (rad/s)
# TMAX: Maximum torque (N·m)
MOTOR_LIMIT_PARAMS: dict[MotorType, tuple[float, float, float]] = {
MotorType.O0: (12.57, 33, 14),
MotorType.O1: (12.57, 44, 17),
MotorType.O2: (12.57, 33, 20),
MotorType.O3: (12.57, 33, 60),
MotorType.O4: (12.57, 33, 120),
MotorType.O5: (12.57, 50, 5.5),
MotorType.ELO5: (12.57, 50, 6),
MotorType.O6: (112.5, 50, 36),
}
# Motor model names
MODEL_NAMES = {
MotorType.O0: "O0",
MotorType.O1: "O1",
MotorType.O2: "O2",
MotorType.O3: "O3",
MotorType.O4: "O4",
MotorType.O5: "O5",
MotorType.ELO5: "ELO5",
MotorType.O6: "O6",
}
# Motor resolution table (encoder counts per revolution)
MODEL_RESOLUTION = {
"O0": 65536,
"O1": 65536,
"O2": 65536,
"O3": 65536,
"O4": 65536,
"O5": 65536,
"ELO5": 65536,
"O6": 65536,
}
# CAN baudrates supported by Robstride motors
AVAILABLE_BAUDRATES = [
1000000, # 4: 1 mbps (default)
]
DEFAULT_BAUDRATE = 1000000
# Default timeout in milliseconds
DEFAULT_TIMEOUT_MS = 0 # disabled by default, otherwise 20000 is 1s
# Data that should be normalized
NORMALIZED_DATA = ["Present_Position", "Goal_Position"]
# MIT control parameter ranges
MIT_KP_RANGE = (0.0, 500.0)
MIT_KD_RANGE = (0.0, 5.0)
# CAN frame command IDs
CAN_CMD_ENABLE = 0xFC
CAN_CMD_DISABLE = 0xFD
CAN_CMD_SET_ZERO = 0xFE
CAN_CMD_CLEAR_FAULT = 0xFB
CAN_CMD_QUERY_PARAM = 0x33
CAN_CMD_WRITE_PARAM = 0x55
CAN_CMD_SAVE_PARAM = 0xAA
# CAN ID for parameter operations
CAN_PARAM_ID = 0x7FF
RUNNING_TIMEOUT = 0.001
PARAM_TIMEOUT = 0.01
STATE_CACHE_TTL_S = 0.02
-8
View File
@@ -34,7 +34,6 @@ from lerobot.policies.diffusion.configuration_diffusion import DiffusionConfig
from lerobot.policies.groot.configuration_groot import GrootConfig
from lerobot.policies.pi0.configuration_pi0 import PI0Config
from lerobot.policies.pi05.configuration_pi05 import PI05Config
from lerobot.policies.pi05_full.configuration_pi05 import PI05FullConfig
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.policies.sac.configuration_sac import SACConfig
from lerobot.policies.sac.reward_model.configuration_classifier import RewardClassifierConfig
@@ -391,13 +390,6 @@ def make_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(policy_cfg, PI05FullConfig):
from lerobot.policies.pi05_full.processor_pi05 import make_pi05_full_pre_post_processors
processors = make_pi05_full_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
else:
try:
-49
View File
@@ -1,49 +0,0 @@
# π₀.₅ (pi05)
This repository contains the Hugging Face port of **π₀.₅**, adapted from [OpenPI](https://github.com/Physical-Intelligence/openpi) by the Physical Intelligence.
It is designed as a **Vision-Language-Action model with open-world generalization**.
---
## Model Overview
| Feature | π₀ | π₀.₅ |
| -------------------- | ------------------------------------------------------ | ----------------------------------------- |
| Time Conditioning | Concatenates time with actions via `action_time_mlp_*` | Uses `time_mlp_*` for AdaRMS conditioning |
| AdaRMS | Not used | Used in action expert |
| Tokenizer Length | 48 tokens | 200 tokens |
| Discrete State Input | False (Uses `state_proj` layer) | True |
| Parameter Count | Higher (includes state embedding) | Lower (no state embedding) |
---
## Citation
If you use this work, please cite both **OpenPI** and the π₀.₅ paper:
```bibtex
@misc{openpi2024,
author = {Physical Intelligence Lab},
title = {OpenPI: PyTorch Implementation of π0 and π0.5 Policies},
year = {2024},
publisher = {GitHub},
howpublished = {\url{https://github.com/Physical-Intelligence/openpi}},
license = {Apache-2.0}
}
@misc{intelligence2025pi05visionlanguageactionmodelopenworld,
title = {π₀.₅: a Vision-Language-Action Model with Open-World Generalization},
author = {Physical Intelligence and Kevin Black and Noah Brown and James Darpinian and Karan Dhabalia and Danny Driess and Adnan Esmail and Michael Equi and Chelsea Finn and Niccolo Fusai and Manuel Y. Galliker and Dibya Ghosh and Lachy Groom and Karol Hausman and Brian Ichter and Szymon Jakubczak and Tim Jones and Liyiming Ke and Devin LeBlanc and Sergey Levine and Adrian Li-Bell and Mohith Mothukuri and Suraj Nair and Karl Pertsch and Allen Z. Ren and Lucy Xiaoyang Shi and Laura Smith and Jost Tobias Springenberg and Kyle Stachowicz and James Tanner and Quan Vuong and Homer Walke and Anna Walling and Haohuan Wang and Lili Yu and Ury Zhilinsky},
year = {2025},
eprint = {2504.16054},
archivePrefix= {arXiv},
primaryClass = {cs.LG},
url = {https://arxiv.org/abs/2504.16054},
}
```
---
## License
This port follows the **Apache 2.0 License**, consistent with the original [OpenPI repository](https://github.com/Physical-Intelligence/openpi).
@@ -1,50 +0,0 @@
#!/bin/bash
# Example script to run synthetic data generation with Qwen VLM
# This generates user prompts and robot utterances for hierarchical policy training
# Configuration
REPO_ID="lerobot/libero_10"
MODEL="Qwen/Qwen3-VL-30B-A3B-Instruct"
# or: MODEL="Qwen/Qwen2-VL-7B-Instruct"
OUTPUT_DIR="/fsx/jade_choghari/outputs/libero-10-annotate-high"
BATCH_SIZE=16
TEMPERATURE=0.9
SAMPLE_INTERVAL=5.0 # generate dialogue every 1 second (all episodes processed)
# Run subtask annotation
# python /admin/home/jade_choghari/lerobot/src/lerobot/policies/pi05_full/annotate/subtask_annotate.py \
# --repo-id "$REPO_ID" \
# --video-key observation.images.image \
# --output-dir "$OUTPUT_DIR" \
# --skip-existing \
# --output-repo-id "jadechoghari/libero10-annotate" \
# --batch-size "$BATCH_SIZE" \
# run synthetic data generation (all episodes processed)
# python examples/dataset/annotate_pgen.py \
# --repo-id "$REPO_ID" \
# --model "$MODEL" \
# --output-dir "$OUTPUT_DIR" \
# --temperature "$TEMPERATURE" \
# --batch-size "$BATCH_SIZE" \
# --sample-interval "$SAMPLE_INTERVAL" \
# --image-key observation.images.base \
# --num-image-views-per-sample 1
# for faster testing, increase sample interval:
# --sample-interval 5.0 # Samples every 5 seconds (much faster)
# to push to hub after generation:
# add --push-to-hub flag
# efficient batch processing: 4 episodes at once
python /admin/home/jade_choghari/lerobot/src/lerobot/policies/pi05_full/annotate/high_level_annotate.py \
--data-dir "/fsx/jade_choghari/outputs/libero-10-annotate" \
--output-dir "$OUTPUT_DIR" \
--video-mode \
--video-key observation.images.image \
--video-batch-size "$BATCH_SIZE" \
--sample-interval 5.0
File diff suppressed because it is too large Load Diff
@@ -1,52 +0,0 @@
import torch
from huggingface_hub import HfApi
import lerobot
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.policies.factory import make_pre_post_processors
from lerobot.configs.policies import PreTrainedConfig
# /fsx/jade_choghari/data/libero_10_subtasks_kw_converted
dataset = LeRobotDataset(repo_id="lerobot/libero_10_image_subtask")
dataloader = torch.utils.data.DataLoader(
dataset,
num_workers=0,
batch_size=2,
shuffle=True,
)
cfg = PreTrainedConfig.from_pretrained(
pretrained_name_or_path="/fsx/jade_choghari/models/pi05-base",
)
cfg.dtype = "bfloat16"
pre_processor, post_processor = make_pre_post_processors(
policy_cfg=cfg,
pretrained_path="/fsx/jade_choghari/models/pi05-base",
)
batch = next(iter(dataloader))
breakpoint()
batch1 = pre_processor(batch)
breakpoint()
print(batch.keys())
# print(batch['task_index_high_level'].shape)
# print(batch['task_index_high_level'])
# print(batch['user_prompt'][0])
# print(batch['robot_utterance'][0])
# print(batch['task'][0])
valid_episode_list = []
for episode_idx in range(len(dataset.meta.episodes)):
subtask_index = dataset[episode_idx]["subtask_index"]
valid_episode_list.append(episode_idx)
print(len(valid_episode_list))
# read this parquet /fsx/jade_choghari/outputs/pgen_annotations1/meta/tasks.parquett
# import pandas as pd
# tasks_df = pd.read_parquet('/fsx/jade_choghari/outputs/pgen_annotations1/meta/tasks.parquet')
# # print all
# print(tasks_df.columns)
# breakpoint()
@@ -1,49 +0,0 @@
#!/bin/bash
# Example script to run synthetic data generation with Qwen VLM
# This generates user prompts and robot utterances for hierarchical policy training
# Configuration
REPO_ID="jadechoghari/collect-data"
MODEL="Qwen/Qwen3-VL-30B-A3B-Instruct"
# or: MODEL="Qwen/Qwen2-VL-7B-Instruct"
OUTPUT_DIR="/fsx/jade_choghari/outputs/collect-data-pgen_new"
BATCH_SIZE=32
TEMPERATURE=0.9
SAMPLE_INTERVAL=5.0 # generate dialogue every 1 second (all episodes processed)
# Run subtask annotation
python /admin/home/jade_choghari/lerobot/src/lerobot/policies/pi05_full/annotate/subtask_annotate.py \
--repo-id "$REPO_ID" \
--video-key observation.images.base \
--output-dir "$OUTPUT_DIR" \
--output-repo-id "jadechoghari/collect-data-with-subtasks"
# run synthetic data generation (all episodes processed)
# python examples/dataset/annotate_pgen.py \
# --repo-id "$REPO_ID" \
# --model "$MODEL" \
# --output-dir "$OUTPUT_DIR" \
# --temperature "$TEMPERATURE" \
# --batch-size "$BATCH_SIZE" \
# --sample-interval "$SAMPLE_INTERVAL" \
# --image-key observation.images.base \
# --num-image-views-per-sample 1
# for faster testing, increase sample interval:
# --sample-interval 5.0 # Samples every 5 seconds (much faster)
# to push to hub after generation:
# add --push-to-hub flag
# efficient batch processing: 4 episodes at once
# python examples/dataset/annotate_pgen.py \
# --repo-id "$REPO_ID" \
# --model "$MODEL" \
# --output-dir "$OUTPUT_DIR" \
# --video-mode \
# --video-key observation.images.up \
# --video-batch-size "$BATCH_SIZE" \
# --sample-interval 1.0
File diff suppressed because it is too large Load Diff
@@ -1,183 +0,0 @@
#!/usr/bin/env python
# Copyright 2025 Physical Intelligence and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.configs.policies import PreTrainedConfig
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from lerobot.optim.optimizers import AdamWConfig
from lerobot.optim.schedulers import CosineDecayWithWarmupSchedulerConfig
from lerobot.policies.rtc.configuration_rtc import RTCConfig
from lerobot.utils.constants import ACTION, OBS_IMAGES, OBS_STATE
DEFAULT_IMAGE_SIZE = 224
@PreTrainedConfig.register_subclass("pi05_full")
@dataclass
class PI05FullConfig(PreTrainedConfig):
paligemma_variant: str = "gemma_2b"
action_expert_variant: str = "gemma_300m"
dtype: str = "float32" # Options: "bfloat16", "float32"
n_obs_steps: int = 1
chunk_size: int = 50 # Number of action steps to predict, in openpi called "action_horizon"
n_action_steps: int = 50 # Number of action steps to execute
# Shorter state and action vectors will be padded to these dimensions
max_state_dim: int = 32
max_action_dim: int = 32
# Flow matching parameters: see openpi `PI0Pytorch`
num_inference_steps: int = 10
time_sampling_beta_alpha: float = 1.5
time_sampling_beta_beta: float = 1.0
time_sampling_scale: float = 0.999
time_sampling_offset: float = 0.001
min_period: float = 4e-3
max_period: float = 4.0
# Real-Time Chunking (RTC) configuration
rtc_config: RTCConfig | None = None
image_resolution: tuple[int, int] = (
DEFAULT_IMAGE_SIZE,
DEFAULT_IMAGE_SIZE,
) # see openpi `preprocessing_pytorch.py`
# Add empty images. Used to add empty cameras when no image features are present.
empty_cameras: int = 0
normalization_mapping: dict[str, NormalizationMode] = field(
default_factory=lambda: {
"VISUAL": NormalizationMode.IDENTITY,
"STATE": NormalizationMode.MEAN_STD, # Pi0.5 uses quantiles for state
"ACTION": NormalizationMode.MEAN_STD, # Pi0.5 uses quantiles for action
}
)
action_tokenizer_name: str = "physical-intelligence/fast"
text_tokenizer_name: str = "google/paligemma-3b-pt-224"
max_action_tokens: int = 256
fast_skip_tokens: int = 128
# subtask stuff
max_decoding_steps: int = 200
temperature: float = 0.0
subtask_regeneration_interval: float = 1.0 # Regenerate subtask tokens every N seconds (0 = every call)
# Training settings
gradient_checkpointing: bool = False # Enable gradient checkpointing for memory optimization
compile_model: bool = False # Whether to use torch.compile for model optimization
compile_mode: str = "max-autotune" # Torch compile mode
device: str | None = None # Device to use for the model (None = auto-detect)
# Finetuning settings
freeze_vision_encoder: bool = False # Freeze only the vision encoder
train_expert_only: bool = False # Freeze entire VLM, train only action expert and projections
knowledge_insulation: bool = True # Enable knowledge insulation in attention (blocks gradients from action to VLM K/V)
# Loss weights (used when knowledge_insulation is enabled)
loss_weight_flow: float = 1.0 # Weight for flow matching MSE loss (continuous actions)
loss_weight_action_ce: float = 1.0 # Weight for FAST action token cross-entropy loss
loss_weight_subtask_ce: float = 1.0 # Weight for subtask token cross-entropy loss
# Optimizer settings: see openpi `AdamW`
optimizer_lr: float = 2.5e-5 # see openpi `CosineDecaySchedule: peak_lr`
optimizer_betas: tuple[float, float] = (0.9, 0.95)
optimizer_eps: float = 1e-8
optimizer_weight_decay: float = 0.01
optimizer_grad_clip_norm: float = 1.0
# Scheduler settings: see openpi `CosineDecaySchedule`
# Note: These will auto-scale if --steps < scheduler_decay_steps
# For example, --steps=3000 will scale warmup to 100 and decay to 3000
scheduler_warmup_steps: int = 1_000
scheduler_decay_steps: int = 30_000
scheduler_decay_lr: float = 2.5e-6
tokenizer_max_length: int = 48 # see openpi `__post_init__`
def __post_init__(self):
super().__post_init__()
# Validate configuration
if self.n_action_steps > self.chunk_size:
raise ValueError(
f"n_action_steps ({self.n_action_steps}) cannot be greater than chunk_size ({self.chunk_size})"
)
if self.paligemma_variant not in ["gemma_300m", "gemma_2b"]:
raise ValueError(f"Invalid paligemma_variant: {self.paligemma_variant}")
if self.action_expert_variant not in ["gemma_300m", "gemma_2b"]:
raise ValueError(f"Invalid action_expert_variant: {self.action_expert_variant}")
if self.dtype not in ["bfloat16", "float32"]:
raise ValueError(f"Invalid dtype: {self.dtype}")
def validate_features(self) -> None:
"""Validate and set up input/output features."""
for i in range(self.empty_cameras):
key = OBS_IMAGES + f".empty_camera_{i}"
empty_camera = PolicyFeature(
type=FeatureType.VISUAL,
shape=(3, *self.image_resolution), # Use configured image resolution
)
self.input_features[key] = empty_camera
if OBS_STATE not in self.input_features:
state_feature = PolicyFeature(
type=FeatureType.STATE,
shape=(self.max_state_dim,), # Padded to max_state_dim
)
self.input_features[OBS_STATE] = state_feature
if ACTION not in self.output_features:
action_feature = PolicyFeature(
type=FeatureType.ACTION,
shape=(self.max_action_dim,), # Padded to max_action_dim
)
self.output_features[ACTION] = action_feature
def get_optimizer_preset(self) -> AdamWConfig:
return AdamWConfig(
lr=self.optimizer_lr,
betas=self.optimizer_betas,
eps=self.optimizer_eps,
weight_decay=self.optimizer_weight_decay,
grad_clip_norm=self.optimizer_grad_clip_norm,
)
def get_scheduler_preset(self):
return CosineDecayWithWarmupSchedulerConfig(
peak_lr=self.optimizer_lr,
decay_lr=self.scheduler_decay_lr,
num_warmup_steps=self.scheduler_warmup_steps,
num_decay_steps=self.scheduler_decay_steps,
)
@property
def observation_delta_indices(self) -> None:
return None
@property
def action_delta_indices(self) -> list:
return list(range(self.chunk_size))
@property
def reward_delta_indices(self) -> None:
return None
@@ -1,92 +0,0 @@
import torch
from huggingface_hub import HfApi
import lerobot
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
# import make_pre_post_processors
from lerobot.policies.factory import make_pre_post_processors
from lerobot.policies.pi05.configuration_pi05 import PI05Config
from lerobot.policies.factory import make_policy, make_policy_config
from lerobot.configs.policies import PreTrainedConfig
cfg = PreTrainedConfig.from_pretrained(
pretrained_name_or_path="/fsx/jade_choghari/models/pi05-base",
)
cfg.dtype = "bfloat16"
pre_processor, post_processor = make_pre_post_processors(
policy_cfg=cfg,
pretrained_path="/fsx/jade_choghari/models/pi05-base",
)
delta_timestamps = {'action': [0.0, 0.03333333333333333, 0.06666666666666667, 0.1, 0.13333333333333333, 0.16666666666666666, 0.2, 0.23333333333333334, 0.26666666666666666, 0.3, 0.3333333333333333, 0.36666666666666664, 0.4, 0.43333333333333335, 0.4666666666666667, 0.5, 0.5333333333333333, 0.5666666666666667, 0.6, 0.6333333333333333, 0.6666666666666666, 0.7, 0.7333333333333333, 0.7666666666666667, 0.8, 0.8333333333333334, 0.8666666666666667, 0.9, 0.9333333333333333, 0.9666666666666667, 1.0, 1.0333333333333334, 1.0666666666666667, 1.1, 1.1333333333333333, 1.1666666666666667, 1.2, 1.2333333333333334, 1.2666666666666666, 1.3, 1.3333333333333333, 1.3666666666666667, 1.4, 1.4333333333333333, 1.4666666666666666, 1.5, 1.5333333333333334, 1.5666666666666667, 1.6, 1.6333333333333333]}
dataset = LeRobotDataset(repo_id="local", root="/fsx/jade_choghari/outputs/pgen_annotations1", delta_timestamps=delta_timestamps)
# rename map --rename_map='{
# "observation.images.side": "observation.images.base_0_rgb",
# "observation.images.up": "observation.images.left_wrist_0_rgb"
# }'
rename_map = {
"observation.images.side": "observation.images.base_0_rgb",
"observation.images.up": "observation.images.left_wrist_0_rgb"
}
policy = make_policy(
cfg=cfg,
ds_meta=dataset.meta,
rename_map=rename_map,
)
dataloader = torch.utils.data.DataLoader(
dataset,
num_workers=0,
batch_size=4,
shuffle=True,
)
batch = next(iter(dataloader))
breakpoint()
batch = pre_processor(batch)
policy.train()
# run inference
# action = policy.select_action(batch)
loss, loss_dict = policy.forward(batch)
breakpoint()
# import requests
# from PIL import Image
# from transformers import AutoProcessor
# model = policy.model.paligemma_with_expert.paligemma
# model = model.to(device="cuda", dtype=torch.bfloat16)
# model.eval()
# prompt = "Describe this image."
# url = "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/pipeline-cat-chonk.jpeg"
# image = Image.open(requests.get(url, stream=True).raw)
# processor = AutoProcessor.from_pretrained(
# "google/paligemma-3b-pt-224",
# )
# inputs = processor(image, prompt, return_tensors="pt").to(model.device)
# print("generating...")
# output = model.generate(
# **inputs,
# max_new_tokens=50,
# use_cache=True, # default dynamic cache
# )
# print(processor.decode(output[0], skip_special_tokens=True))
# # other model
# from transformers import PaliGemmaForConditionalGeneration
# model = PaliGemmaForConditionalGeneration.from_pretrained(
# "google/paligemma2-3b-pt-224",
# torch_dtype=torch.bfloat16,
# device_map="auto",
# )
# model.eval()
# print("generating...")
# output = model.generate(
# **inputs,
# max_new_tokens=100,
# use_cache=True, # default dynamic cache
# )
# print("Model 2 output:")
# print(processor.decode(output[0], skip_special_tokens=True))
File diff suppressed because it is too large Load Diff
@@ -1,194 +0,0 @@
#!/usr/bin/env python
# Copyright 2025 Physical Intelligence and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from copy import deepcopy
from dataclasses import dataclass
from typing import Any
import numpy as np
import torch
from lerobot.configs.types import PipelineFeatureType, PolicyFeature
from lerobot.policies.pi05_full.configuration_pi05 import PI05FullConfig
from lerobot.policies.pi05_full.modeling_pi05 import pad_vector
from lerobot.processor import (
ActionTokenizerProcessorStep,
AddBatchDimensionProcessorStep,
DeviceProcessorStep,
NormalizerProcessorStep,
PolicyAction,
PolicyProcessorPipeline,
ProcessorStep,
ProcessorStepRegistry,
RenameObservationsProcessorStep,
TokenizerProcessorStep,
UnnormalizerProcessorStep,
)
from lerobot.processor.converters import policy_action_to_transition, transition_to_policy_action
from lerobot.processor.core import EnvTransition, TransitionKey
from lerobot.utils.constants import (
OBS_STATE,
POLICY_POSTPROCESSOR_DEFAULT_NAME,
POLICY_PREPROCESSOR_DEFAULT_NAME,
)
@ProcessorStepRegistry.register(name="pi05_full_prepare_state_tokenizer_processor_step")
@dataclass
class Pi05FullPrepareStateTokenizerProcessorStep(ProcessorStep):
"""
Processor step to prepare the state and tokenize the language input.
"""
max_state_dim: int = 32
task_key: str = "task"
subtask_key: str = "subtask"
def __call__(self, transition: EnvTransition) -> EnvTransition:
transition = transition.copy()
state = transition.get(TransitionKey.OBSERVATION, {}).get(OBS_STATE)
if state is None:
raise ValueError("State is required for PI05")
user_prompts = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}).get(self.task_key)
if user_prompts is None:
raise ValueError("No user prompts found in complementary data")
commands = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}).get(self.subtask_key)
# TODO: check if this necessary
state = deepcopy(state)
# Prepare state (pad to max_state_dim)
state = pad_vector(state, self.max_state_dim)
# State should already be normalized to [-1, 1] by the NormalizerProcessorStep that runs before this step
# Discretize into 256 bins (see openpi `PaligemmaTokenizer.tokenize()`)
state_np = state.cpu().numpy()
discretized_states = np.digitize(state_np, bins=np.linspace(-1, 1, 256 + 1)[:-1]) - 1
full_prompts = []
for i, user_prompt in enumerate(user_prompts):
cleaned_text = user_prompt.strip().replace("_", " ").replace("\n", " ")
cleaned_text = cleaned_text.lower() # all lowercase # NOTE: added by (jadechoghari)
state_str = " ".join(map(str, discretized_states[i]))
full_prompt = f"Task: {cleaned_text}, State: {state_str};\n"
full_prompts.append(full_prompt)
transition[TransitionKey.COMPLEMENTARY_DATA][self.task_key] = full_prompts
# process commands (optional)
if commands is not None:
full_commands = []
for i, command in enumerate(commands):
cleaned_text = command.strip().replace("_", " ").replace("\n", " ")
cleaned_text = cleaned_text.lower() # all lowercase # NOTE: added by (jadechoghari)
full_command = f"Subtask: {cleaned_text};\n"
full_commands.append(full_command)
transition[TransitionKey.COMPLEMENTARY_DATA][self.subtask_key] = full_commands
# note: action tokens will be processed in the ActionTokenizerProcessorStep
# Normalize state to [-1, 1] range if needed (assuming it's already normalized by normalizer processor step!!)
# Discretize into 256 bins (see openpi `PaligemmaTokenizer.tokenize()`)
return transition
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
"""
This step does not alter the feature definitions.
"""
return features
def make_pi05_full_pre_post_processors(
config: PI05FullConfig,
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
) -> tuple[
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
PolicyProcessorPipeline[PolicyAction, PolicyAction],
]:
"""
Constructs pre-processor and post-processor pipelines for the PI0 policy.
The pre-processing pipeline prepares input data for the model by:
1. Renaming features to match pretrained configurations.
2. Normalizing input and output features based on dataset statistics.
3. Adding a batch dimension.
4. Appending a newline character to the task description for tokenizer compatibility.
5. Tokenizing the text prompt using the PaliGemma tokenizer.
6. Moving all data to the specified device.
The post-processing pipeline handles the model's output by:
1. Moving data to the CPU.
2. Unnormalizing the output features to their original scale.
Args:
config: The configuration object for the PI0 policy.
dataset_stats: A dictionary of statistics for normalization.
preprocessor_kwargs: Additional arguments for the pre-processor pipeline.
postprocessor_kwargs: Additional arguments for the post-processor pipeline.
Returns:
A tuple containing the configured pre-processor and post-processor pipelines.
"""
# Add remaining processors
input_steps: list[ProcessorStep] = [
RenameObservationsProcessorStep(rename_map={}), # To mimic the same processor as pretrained one
AddBatchDimensionProcessorStep(),
# NOTE: NormalizerProcessorStep MUST come before Pi05PrepareStateTokenizerProcessorStep
# because the tokenizer step expects normalized state in [-1, 1] range for discretization
NormalizerProcessorStep(
features={**config.input_features, **config.output_features},
norm_map=config.normalization_mapping,
stats=dataset_stats,
),
Pi05FullPrepareStateTokenizerProcessorStep(max_state_dim=config.max_state_dim),
TokenizerProcessorStep(
tokenizer_name=config.text_tokenizer_name,
max_length=config.tokenizer_max_length,
padding_side="right",
padding="max_length",
),
ActionTokenizerProcessorStep(
action_tokenizer_name=config.action_tokenizer_name,
max_action_tokens=config.max_action_tokens,
fast_skip_tokens=config.fast_skip_tokens,
paligemma_tokenizer_name=config.text_tokenizer_name,
),
DeviceProcessorStep(device=config.device),
]
output_steps: list[ProcessorStep] = [
UnnormalizerProcessorStep(
features=config.output_features, norm_map=config.normalization_mapping, stats=dataset_stats
),
DeviceProcessorStep(device="cpu"),
]
return (
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]](
steps=input_steps,
name=POLICY_PREPROCESSOR_DEFAULT_NAME,
),
PolicyProcessorPipeline[PolicyAction, PolicyAction](
steps=output_steps,
name=POLICY_POSTPROCESSOR_DEFAULT_NAME,
to_transition=policy_action_to_transition,
to_output=transition_to_policy_action,
),
)
@@ -27,18 +27,18 @@ Usage:
# Full RA-BC computation with visualizations
python src/lerobot/policies/sarm/compute_rabc_weights.py \\
--dataset-repo-id lerobot/aloha_sim_insertion_human \\
--reward-model-path pepijn223/sarm_single_uni4
--reward-model-path <USER>/sarm_single_uni4
# Faster computation with stride (compute every 5 frames, interpolate the rest)
python src/lerobot/policies/sarm/compute_rabc_weights.py \\
--dataset-repo-id lerobot/aloha_sim_insertion_human \\
--reward-model-path pepijn223/sarm_single_uni4 \\
--reward-model-path <USER>/sarm_single_uni4 \\
--stride 5
# Visualize predictions only (no RA-BC computation)
python src/lerobot/policies/sarm/compute_rabc_weights.py \\
--dataset-repo-id lerobot/aloha_sim_insertion_human \\
--reward-model-path pepijn223/sarm_single_uni4 \\
--reward-model-path <USER>/sarm_single_uni4 \\
--visualize-only \\
--num-visualizations 5
@@ -714,12 +714,12 @@ Examples:
# Full RA-BC computation with visualizations
python src/lerobot/policies/sarm/compute_rabc_weights.py \\
--dataset-repo-id lerobot/aloha_sim_insertion_human \\
--reward-model-path pepijn223/sarm_single_uni4
--reward-model-path <USER>/sarm_single_uni4
# Visualize predictions only (no RA-BC computation)
python src/lerobot/policies/sarm/compute_rabc_weights.py \\
--dataset-repo-id lerobot/aloha_sim_insertion_human \\
--reward-model-path pepijn223/sarm_single_uni4 \\
--reward-model-path <USER>/sarm_single_uni4 \\
--visualize-only \\
--num-visualizations 10
""",
@@ -85,7 +85,7 @@ class SmolVLAConfig(PreTrainedConfig):
scheduler_decay_lr: float = 2.5e-6
vlm_model_name: str = "HuggingFaceTB/SmolVLM2-500M-Video-Instruct" # Select the VLM backbone.
load_vlm_weights: bool = False # Set to True in case of training the expert from scratch. True when init from pretrained SmolVLA weights
load_vlm_weights: bool = False # Set to False in case of training the expert from scratch. True when init from pretrained SmolVLA weights
add_image_special_tokens: bool = False # Whether to use special image tokens around image features.
@@ -30,7 +30,7 @@ Example of finetuning the smolvla pretrained model (`smolvla_base`):
```bash
lerobot-train \
--policy.path=lerobot/smolvla_base \
--dataset.repo_id=danaaubakirova/svla_so100_task1_v3 \
--dataset.repo_id=<USER>/svla_so100_task1_v3 \
--batch_size=64 \
--steps=200000
```
@@ -40,7 +40,7 @@ and an action expert.
```bash
lerobot-train \
--policy.type=smolvla \
--dataset.repo_id=danaaubakirova/svla_so100_task1_v3 \
--dataset.repo_id=<USER>/svla_so100_task1_v3 \
--batch_size=64 \
--steps=200000
```
@@ -378,16 +378,16 @@ class SmolVLAPolicy(PreTrainedPolicy):
actions_is_pad = batch.get("actions_id_pad")
loss_dict = {}
losses = self.model.forward(images, img_masks, lang_tokens, lang_masks, state, actions, noise, time)
loss_dict["losses_after_forward"] = losses.clone()
loss_dict["losses_after_forward"] = losses.clone().mean().item()
if actions_is_pad is not None:
in_episode_bound = ~actions_is_pad
losses = losses * in_episode_bound.unsqueeze(-1)
loss_dict["losses_after_in_ep_bound"] = losses.clone()
loss_dict["losses_after_in_ep_bound"] = losses.clone().mean().item()
# Remove padding
losses = losses[:, :, : self.config.max_action_dim]
loss_dict["losses_after_rm_padding"] = losses.clone()
loss_dict["losses_after_rm_padding"] = losses.clone().mean().item()
if reduction == "none":
# Return per-sample losses (B,) by averaging over time and action dims
+2
View File
@@ -44,6 +44,7 @@ from .hil_processor import (
AddTeleopActionAsComplimentaryDataStep,
AddTeleopEventsAsInfoStep,
GripperPenaltyProcessorStep,
GymHILAdapterProcessorStep,
ImageCropResizeProcessorStep,
InterventionActionProcessorStep,
RewardClassifierProcessorStep,
@@ -87,6 +88,7 @@ __all__ = [
"DoneProcessorStep",
"EnvAction",
"EnvTransition",
"GymHILAdapterProcessorStep",
"GripperPenaltyProcessorStep",
"hotswap_stats",
"IdentityProcessorStep",
+1 -3
View File
@@ -171,11 +171,9 @@ def _extract_complementary_data(batch: dict[str, Any]) -> dict[str, Any]:
subtask_key = {"subtask": batch["subtask"]} if "subtask" in batch else {}
index_key = {"index": batch["index"]} if "index" in batch else {}
task_index_key = {"task_index": batch["task_index"]} if "task_index" in batch else {}
user_prompt_key = {"user_prompt": batch["user_prompt"]} if "user_prompt" in batch else {}
subtask_key = {"subtask": batch["subtask"]} if "subtask" in batch else {}
episode_index_key = {"episode_index": batch["episode_index"]} if "episode_index" in batch else {}
return {**pad_keys, **task_key, **index_key, **task_index_key, **episode_index_key, **user_prompt_key, **subtask_key}
return {**pad_keys, **task_key, **subtask_key, **index_key, **task_index_key, **episode_index_key}
def create_transition(
+4 -6
View File
@@ -17,7 +17,7 @@ from dataclasses import dataclass
import torch
from lerobot.configs.types import PipelineFeatureType, PolicyFeature
from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature
from lerobot.utils.constants import OBS_IMAGES, OBS_PREFIX, OBS_STATE, OBS_STR
from .pipeline import ObservationProcessorStep, ProcessorStepRegistry
@@ -92,7 +92,7 @@ class LiberoProcessorStep(ObservationProcessorStep):
# copy over non-STATE features
for ft, feats in features.items():
if ft != PipelineFeatureType.STATE:
if ft != FeatureType.STATE:
new_features[ft] = feats.copy()
# rebuild STATE features
@@ -100,13 +100,11 @@ class LiberoProcessorStep(ObservationProcessorStep):
# add our new flattened state
state_feats[OBS_STATE] = PolicyFeature(
key=OBS_STATE,
type=FeatureType.STATE,
shape=(8,), # [eef_pos(3), axis_angle(3), gripper(2)]
dtype="float32",
description=("Concatenated end-effector position (3), axis-angle (3), and gripper qpos (2)."),
)
new_features[PipelineFeatureType.STATE] = state_feats
new_features[FeatureType.STATE] = state_feats
return new_features
@@ -20,6 +20,7 @@ from lerobot.configs.types import PipelineFeatureType, PolicyFeature
from .converters import to_tensor
from .core import EnvAction, EnvTransition, PolicyAction
from .hil_processor import TELEOP_ACTION_KEY
from .pipeline import ActionProcessorStep, ProcessorStep, ProcessorStepRegistry
@@ -89,6 +90,13 @@ class Numpy2TorchActionProcessorStep(ProcessorStep):
torch_action = to_tensor(action, dtype=None) # Preserve original dtype
new_transition[TransitionKey.ACTION] = torch_action
complementary_data = new_transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
if TELEOP_ACTION_KEY in complementary_data:
teleop_action = complementary_data[TELEOP_ACTION_KEY]
if isinstance(teleop_action, EnvAction):
complementary_data[TELEOP_ACTION_KEY] = to_tensor(teleop_action)
new_transition[TransitionKey.COMPLEMENTARY_DATA] = complementary_data
return new_transition
def transform_features(
+31
View File
@@ -312,6 +312,37 @@ class TimeLimitProcessorStep(TruncatedProcessorStep):
return features
@ProcessorStepRegistry.register("gym_hil_adapter_processor")
class GymHILAdapterProcessorStep(ProcessorStep):
"""
Adapts the output of the `gym-hil` environment to the format expected by `lerobot` processors.
This step normalizes the `transition` object by:
1. Copying `teleop_action` from `info` to `complementary_data`.
2. Copying `is_intervention` from `info` (using the string key) to `info` (using the enum key).
"""
def __call__(self, transition: EnvTransition) -> EnvTransition:
info = transition.get(TransitionKey.INFO, {})
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
if TELEOP_ACTION_KEY in info:
complementary_data[TELEOP_ACTION_KEY] = info[TELEOP_ACTION_KEY]
if "is_intervention" in info:
info[TeleopEvents.IS_INTERVENTION] = info["is_intervention"]
transition[TransitionKey.INFO] = info
transition[TransitionKey.COMPLEMENTARY_DATA] = complementary_data
return transition
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
return features
@dataclass
@ProcessorStepRegistry.register("gripper_penalty_processor")
class GripperPenaltyProcessorStep(ProcessorStep):
+1 -1
View File
@@ -413,7 +413,7 @@ class DataProcessorPipeline(HubMixin, Generic[TInput, TOutput]):
Args:
save_directory: The directory where the pipeline will be saved. If None, saves to
HF_LEROBOT_HOME/processors/{sanitized_pipeline_name}.
repo_id: ID of your repository on the Hub. Used only if `push_to_hub=True`.
repo_id: ID of your repository on the Hub. Used only if `push_to_hub=true`.
push_to_hub: Whether or not to push your object to the Hugging Face Hub after saving it.
card_kwargs: Additional arguments passed to the card template to customize the card.
config_filename: The name of the JSON configuration file. If None, a name is
+4 -89
View File
@@ -37,9 +37,6 @@ from lerobot.utils.constants import (
OBS_LANGUAGE_SUBTASK_ATTENTION_MASK,
OBS_LANGUAGE_SUBTASK_TOKENS,
OBS_LANGUAGE_TOKENS,
OBS_LANGUAGE_USER_PROMPT,
OBS_LANGUAGE_USER_PROMPT_ATTENTION_MASK,
OBS_LANGUAGE_USER_PROMPT_TOKENS,
)
from lerobot.utils.import_utils import _transformers_available
@@ -144,32 +141,6 @@ class TokenizerProcessorStep(ObservationProcessorStep):
return None
def get_user_prompt(self, transition: EnvTransition) -> list[str] | None:
"""
Extracts the user_prompt from the transition's complementary data.
Args:
transition: The environment transition.
Returns:
A list of user_prompt strings, or None if the user_prompt key is not found or the value is None.
"""
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA)
if complementary_data is None:
return None
user_prompt = complementary_data.get("user_prompt")
if user_prompt is None:
return None
# Standardize to a list of strings for the tokenizer
if isinstance(user_prompt, str):
return [user_prompt]
elif isinstance(user_prompt, list) and all(isinstance(t, str) for t in user_prompt):
return user_prompt
return None
def get_subtask(self, transition: EnvTransition) -> list[str] | None:
"""
Extracts the subtask from the transition's complementary data.
@@ -198,16 +169,16 @@ class TokenizerProcessorStep(ObservationProcessorStep):
def observation(self, observation: RobotObservation) -> RobotObservation:
"""
Tokenizes the task description and user_prompt (if available) and adds them to the observation dictionary.
Tokenizes the task description and adds it to the observation dictionary.
This method retrieves the task and user_prompt, tokenizes them, moves the resulting tensors to the
This method retrieves the task, tokenizes it, moves the resulting tensors to the
same device as other data in the transition, and updates the observation.
Args:
observation: The original observation dictionary.
Returns:
The updated observation dictionary including token IDs and attention masks.
The updated observation dictionary including token IDs and an attention mask.
"""
task = self.get_task(self.transition)
if task is None:
@@ -233,45 +204,11 @@ class TokenizerProcessorStep(ObservationProcessorStep):
new_observation[OBS_LANGUAGE_TOKENS] = tokenized_prompt["input_ids"]
new_observation[OBS_LANGUAGE_ATTENTION_MASK] = tokenized_prompt["attention_mask"].to(dtype=torch.bool)
# Tokenize user_prompt if available
user_prompt = self.get_user_prompt(self.transition)
if user_prompt is not None:
tokenized_user_prompt = self._tokenize_text(user_prompt)
# Move new tokenized tensors to the detected device
if target_device is not None:
tokenized_user_prompt = {
k: v.to(target_device) if isinstance(v, torch.Tensor) else v
for k, v in tokenized_user_prompt.items()
}
# Add tokenized user_prompt to the observation
new_observation[OBS_LANGUAGE_USER_PROMPT_TOKENS] = tokenized_user_prompt["input_ids"]
new_observation[OBS_LANGUAGE_USER_PROMPT_ATTENTION_MASK] = tokenized_user_prompt["attention_mask"].to(dtype=torch.bool)
# Tokenize subtask if available
subtask = self.get_subtask(self.transition)
if subtask is not None:
tokenized_subtask = self._tokenize_text(subtask)
# Add EOS token at the end of each subtask sequence (before padding)
eos_token_id = self.input_tokenizer.eos_token_id
input_ids = tokenized_subtask["input_ids"]
attention_mask = tokenized_subtask["attention_mask"]
for i in range(input_ids.size(0)):
# Find the length of actual tokens (sum of attention mask)
seq_len = attention_mask[i].sum().item()
max_len = input_ids.size(1)
if seq_len >= max_len:
raise ValueError(
f"No room to append EOS: seq_len={seq_len} equals max_length={max_len}. "
"Increase max_length or tokenize with padding=False then pad after adding EOS."
)
# Add EOS token at the end
input_ids[i, seq_len] = eos_token_id
attention_mask[i, seq_len] = 1
# Move new tokenized tensors to the detected device
if target_device is not None:
tokenized_subtask = {
@@ -383,28 +320,6 @@ class TokenizerProcessorStep(ObservationProcessorStep):
type=FeatureType.LANGUAGE, shape=(self.max_length,)
)
# Add features for user_prompt tokens and attention mask if they don't already exist
if OBS_LANGUAGE_USER_PROMPT_TOKENS not in features[PipelineFeatureType.OBSERVATION]:
features[PipelineFeatureType.OBSERVATION][OBS_LANGUAGE_USER_PROMPT_TOKENS] = PolicyFeature(
type=FeatureType.LANGUAGE, shape=(self.max_length,)
)
if OBS_LANGUAGE_USER_PROMPT_ATTENTION_MASK not in features[PipelineFeatureType.OBSERVATION]:
features[PipelineFeatureType.OBSERVATION][OBS_LANGUAGE_USER_PROMPT_ATTENTION_MASK] = PolicyFeature(
type=FeatureType.LANGUAGE, shape=(self.max_length,)
)
# Add features for subtask tokens and attention mask if they don't already exist
if OBS_LANGUAGE_SUBTASK_TOKENS not in features[PipelineFeatureType.OBSERVATION]:
features[PipelineFeatureType.OBSERVATION][OBS_LANGUAGE_SUBTASK_TOKENS] = PolicyFeature(
type=FeatureType.LANGUAGE, shape=(self.max_length,)
)
if OBS_LANGUAGE_SUBTASK_ATTENTION_MASK not in features[PipelineFeatureType.OBSERVATION]:
features[PipelineFeatureType.OBSERVATION][OBS_LANGUAGE_SUBTASK_ATTENTION_MASK] = PolicyFeature(
type=FeatureType.LANGUAGE, shape=(self.max_length,)
)
return features
@@ -658,4 +573,4 @@ class ActionTokenizerProcessorStep(ActionProcessorStep):
Returns:
The updated dictionary of policy features.
"""
return features
return features
+13 -2
View File
@@ -36,6 +36,7 @@ from lerobot.processor import (
DeviceProcessorStep,
EnvTransition,
GripperPenaltyProcessorStep,
GymHILAdapterProcessorStep,
ImageCropResizeProcessorStep,
InterventionActionProcessorStep,
MapDeltaActionToRobotActionStep,
@@ -379,6 +380,7 @@ def make_processors(
]
env_pipeline_steps = [
GymHILAdapterProcessorStep(),
Numpy2TorchActionProcessorStep(),
VanillaObservationProcessorStep(),
AddBatchDimensionProcessorStep(),
@@ -608,7 +610,14 @@ def control_loop(
dataset = None
if cfg.mode == "record":
action_features = teleop_device.action_features
if teleop_device:
action_features = teleop_device.action_features
else:
action_features = {
"dtype": "float32",
"shape": (4,),
"names": ["delta_x", "delta_y", "delta_z", "gripper"],
}
features = {
ACTION: action_features,
REWARD: {"dtype": "float32", "shape": (1,), "names": None},
@@ -656,7 +665,7 @@ def control_loop(
# Create a neutral action (no movement)
neutral_action = torch.tensor([0.0, 0.0, 0.0], dtype=torch.float32)
if use_gripper:
neutral_action = torch.cat([neutral_action, torch.tensor([1.0])]) # Gripper stay
neutral_action = torch.cat([neutral_action, torch.tensor([0.0])]) # Gripper stay
# Use the new step function
transition = step_env_and_process_transition(
@@ -725,6 +734,8 @@ def control_loop(
precise_sleep(max(dt - (time.perf_counter() - step_start_time), 0.0))
if dataset is not None and cfg.dataset.push_to_hub:
logging.info("Finalizing dataset before pushing to hub")
dataset.finalize()
logging.info("Pushing dataset to hub")
dataset.push_to_hub()
@@ -19,6 +19,7 @@ from functools import cached_property
from lerobot.processor import RobotAction, RobotObservation
from lerobot.robots.openarm_follower import OpenArmFollower, OpenArmFollowerConfig
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from ..robot import Robot
from .config_bi_openarm_follower import BiOpenArmFollowerConfig
@@ -112,6 +113,7 @@ class BiOpenArmFollower(Robot):
def is_connected(self) -> bool:
return self.left_arm.is_connected and self.right_arm.is_connected
@check_if_already_connected
def connect(self, calibrate: bool = True) -> None:
self.left_arm.connect(calibrate)
self.right_arm.connect(calibrate)
@@ -133,6 +135,7 @@ class BiOpenArmFollower(Robot):
"Motor ID configuration is typically done via manufacturer tools for CAN motors."
)
@check_if_not_connected
def get_observation(self) -> RobotObservation:
obs_dict = {}
@@ -146,6 +149,7 @@ class BiOpenArmFollower(Robot):
return obs_dict
@check_if_not_connected
def send_action(
self,
action: RobotAction,
@@ -170,6 +174,7 @@ class BiOpenArmFollower(Robot):
return {**prefixed_sent_action_left, **prefixed_sent_action_right}
@check_if_not_connected
def disconnect(self):
self.left_arm.disconnect()
self.right_arm.disconnect()
@@ -19,6 +19,7 @@ from functools import cached_property
from lerobot.processor import RobotAction, RobotObservation
from lerobot.robots.so_follower import SOFollower, SOFollowerRobotConfig
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from ..robot import Robot
from .config_bi_so_follower import BiSOFollowerConfig
@@ -96,6 +97,7 @@ class BiSOFollower(Robot):
def is_connected(self) -> bool:
return self.left_arm.is_connected and self.right_arm.is_connected
@check_if_already_connected
def connect(self, calibrate: bool = True) -> None:
self.left_arm.connect(calibrate)
self.right_arm.connect(calibrate)
@@ -116,6 +118,7 @@ class BiSOFollower(Robot):
self.left_arm.setup_motors()
self.right_arm.setup_motors()
@check_if_not_connected
def get_observation(self) -> RobotObservation:
obs_dict = {}
@@ -129,6 +132,7 @@ class BiSOFollower(Robot):
return obs_dict
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
# Remove "left_" prefix
left_action = {
@@ -148,6 +152,7 @@ class BiSOFollower(Robot):
return {**prefixed_sent_action_left, **prefixed_sent_action_right}
@check_if_not_connected
def disconnect(self):
self.left_arm.disconnect()
self.right_arm.disconnect()
+1 -1
View File
@@ -140,7 +140,7 @@ class HopeJrArm(Robot):
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
+1 -1
View File
@@ -171,7 +171,7 @@ class HopeJrHand(Robot):
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
@@ -193,7 +193,7 @@ class KochFollower(Robot):
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
+1 -1
View File
@@ -360,7 +360,7 @@ class LeKiwi(Robot):
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
@@ -176,7 +176,7 @@ class OmxFollower(Robot):
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
@@ -23,7 +23,7 @@ from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.damiao import DamiaoMotorsBus
from lerobot.processor import RobotAction, RobotObservation
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from ..robot import Robot
from ..utils import ensure_safe_goal_position
@@ -119,6 +119,7 @@ class OpenArmFollower(Robot):
"""Check if robot is connected."""
return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())
@check_if_already_connected
def connect(self, calibrate: bool = True) -> None:
"""
Connect to the robot and optionally calibrate.
@@ -126,8 +127,6 @@ class OpenArmFollower(Robot):
We assume that at connection time, the arms are in a safe rest position,
and torque can be safely disabled to run calibration if needed.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
# Connect to CAN bus
logger.info(f"Connecting arm on {self.config.port}...")
@@ -219,6 +218,7 @@ class OpenArmFollower(Robot):
"Motor ID configuration is typically done via manufacturer tools for CAN motors."
)
@check_if_not_connected
def get_observation(self) -> RobotObservation:
"""
Get current observation from robot including position, velocity, and torque.
@@ -228,9 +228,6 @@ class OpenArmFollower(Robot):
"""
start = time.perf_counter()
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
obs_dict: dict[str, Any] = {}
states = self.bus.sync_read_all_states()
@@ -244,7 +241,7 @@ class OpenArmFollower(Robot):
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
@@ -253,6 +250,7 @@ class OpenArmFollower(Robot):
return obs_dict
@check_if_not_connected
def send_action(
self,
action: RobotAction,
@@ -272,8 +270,6 @@ class OpenArmFollower(Robot):
Returns:
The action actually sent (potentially clipped)
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
@@ -333,10 +329,9 @@ class OpenArmFollower(Robot):
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
@check_if_not_connected
def disconnect(self):
"""Disconnect from robot."""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Disconnect CAN bus
self.bus.disconnect(self.config.disable_torque_on_disconnect)
+1 -1
View File
@@ -180,7 +180,7 @@ class Reachy2Robot(Robot):
# Capture images from cameras
for cam_key, cam in self.cameras.items():
obs_dict[cam_key] = cam.async_read()
obs_dict[cam_key] = cam.read_latest()
return obs_dict
@@ -40,7 +40,7 @@ class SOFollowerConfig:
cameras: dict[str, CameraConfig] = field(default_factory=dict)
# Set to `True` for backward compatibility with previous policies/dataset
use_degrees: bool = False
use_degrees: bool = True
@RobotConfig.register_subclass("so101_follower")
@@ -187,7 +187,7 @@ class SOFollower(Robot):
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
obs_dict[cam_key] = cam.read_latest()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
+1 -1
View File
@@ -324,7 +324,7 @@ class UnitreeG1(Robot):
# Cameras - read images from ZMQ cameras
for cam_name, cam in self._cameras.items():
obs[cam_name] = cam.async_read()
obs[cam_name] = cam.read_latest()
return obs
+25 -12
View File
@@ -47,16 +47,14 @@ local$ rerun lerobot_pusht_episode_0.rrd
```
- Visualize data stored on a distant machine through streaming:
(You need to forward the websocket port to the distant machine, with
`ssh -L 9087:localhost:9087 username@remote-host`)
```
distant$ lerobot-dataset-viz \
--repo-id lerobot/pusht \
--episode-index 0 \
--mode distant \
--ws-port 9087
--grpc-port 9876
local$ rerun ws://localhost:9087
local$ rerun rerun+http://IP:GRPC_PORT/proxy
```
"""
@@ -75,6 +73,7 @@ import tqdm
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.utils.constants import ACTION, DONE, OBS_STATE, REWARD
from lerobot.utils.utils import init_logging
def to_hwc_uint8_numpy(chw_float32_torch: torch.Tensor) -> np.ndarray:
@@ -93,10 +92,11 @@ def visualize_dataset(
num_workers: int = 0,
mode: str = "local",
web_port: int = 9090,
ws_port: int = 9087,
grpc_port: int = 9876,
save: bool = False,
output_dir: Path | None = None,
display_compressed_images: bool = False,
**kwargs,
) -> Path | None:
if save:
assert output_dir is not None, (
@@ -126,7 +126,9 @@ def visualize_dataset(
gc.collect()
if mode == "distant":
rr.serve_web_viewer(open_browser=False, web_port=web_port)
server_uri = rr.serve_grpc(grpc_port=grpc_port)
logging.info(f"Connect to a Rerun Server: rerun rerun+http://IP:{grpc_port}/proxy")
rr.serve_web_viewer(open_browser=False, web_port=web_port, connect_to=server_uri)
logging.info("Logging to Rerun")
@@ -226,7 +228,7 @@ def main():
"Mode of viewing between 'local' or 'distant'. "
"'local' requires data to be on a local machine. It spawns a viewer to visualize the data locally. "
"'distant' creates a server on the distant machine where the data is stored. "
"Visualize the data by connecting to the server with `rerun ws://localhost:PORT` on the local machine."
"Visualize the data by connecting to the server with `rerun rerun+http://IP:GRPC_PORT/proxy` on the local machine."
),
)
parser.add_argument(
@@ -238,8 +240,13 @@ def main():
parser.add_argument(
"--ws-port",
type=int,
default=9087,
help="Web socket port for rerun.io when `--mode distant` is set.",
help="deprecated, please use --grpc-port instead.",
)
parser.add_argument(
"--grpc-port",
type=int,
default=9876,
help="gRPC port for rerun.io when `--mode distant` is set.",
)
parser.add_argument(
"--save",
@@ -265,9 +272,7 @@ def main():
parser.add_argument(
"--display-compressed-images",
type=bool,
required=True,
default=False,
action="store_true",
help="If set, display compressed images in Rerun instead of uncompressed ones.",
)
@@ -277,6 +282,14 @@ def main():
root = kwargs.pop("root")
tolerance_s = kwargs.pop("tolerance_s")
if kwargs["ws_port"] is not None:
logging.warning(
"--ws-port is deprecated and will be removed in future versions. Please use --grpc-port instead."
)
logging.warning("Setting grpc_port to ws_port value.")
kwargs["grpc_port"] = kwargs.pop("ws_port")
init_logging()
logging.info("Loading dataset")
dataset = LeRobotDataset(repo_id, episodes=[args.episode_index], root=root, tolerance_s=tolerance_s)
+104 -38
View File
@@ -24,96 +24,112 @@ When new_repo_id is specified, creates a new dataset.
Usage Examples:
Delete episodes 0, 2, and 5 from a dataset:
python -m lerobot.scripts.lerobot_edit_dataset \
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--operation.type delete_episodes \
--operation.episode_indices "[0, 2, 5]"
Delete episodes and save to a new dataset:
python -m lerobot.scripts.lerobot_edit_dataset \
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--new_repo_id lerobot/pusht_filtered \
--operation.type delete_episodes \
--operation.episode_indices "[0, 2, 5]"
Split dataset by fractions:
python -m lerobot.scripts.lerobot_edit_dataset \
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--operation.type split \
--operation.splits '{"train": 0.8, "val": 0.2}'
Split dataset by episode indices:
python -m lerobot.scripts.lerobot_edit_dataset \
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--operation.type split \
--operation.splits '{"train": [0, 1, 2, 3], "val": [4, 5]}'
Split into more than two splits:
python -m lerobot.scripts.lerobot_edit_dataset \
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--operation.type split \
--operation.splits '{"train": 0.6, "val": 0.2, "test": 0.2}'
Merge multiple datasets:
python -m lerobot.scripts.lerobot_edit_dataset \
lerobot-edit-dataset \
--repo_id lerobot/pusht_merged \
--operation.type merge \
--operation.repo_ids "['lerobot/pusht_train', 'lerobot/pusht_val']"
Remove camera feature:
python -m lerobot.scripts.lerobot_edit_dataset \
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--operation.type remove_feature \
--operation.feature_names "['observation.images.top']"
Modify tasks - set a single task for all episodes (WARNING: modifies in-place):
python -m lerobot.scripts.lerobot_edit_dataset \
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--operation.type modify_tasks \
--operation.new_task "Pick up the cube and place it"
Modify tasks - set different tasks for specific episodes (WARNING: modifies in-place):
python -m lerobot.scripts.lerobot_edit_dataset \
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--operation.type modify_tasks \
--operation.episode_tasks '{"0": "Task A", "1": "Task B", "2": "Task A"}'
Modify tasks - set default task with overrides for specific episodes (WARNING: modifies in-place):
python -m lerobot.scripts.lerobot_edit_dataset \
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--operation.type modify_tasks \
--operation.new_task "Default task" \
--operation.episode_tasks '{"5": "Special task for episode 5"}'
Convert image dataset to video format and save locally:
python -m lerobot.scripts.lerobot_edit_dataset \
lerobot-edit-dataset \
--repo_id lerobot/pusht_image \
--operation.type convert_image_to_video \
--operation.output_dir /path/to/output/pusht_video
Convert image dataset to video format and save with new repo_id:
python -m lerobot.scripts.lerobot_edit_dataset \
lerobot-edit-dataset \
--repo_id lerobot/pusht_image \
--new_repo_id lerobot/pusht_video \
--operation.type convert_image_to_video
Convert image dataset to video format and push to hub:
python -m lerobot.scripts.lerobot_edit_dataset \
lerobot-edit-dataset \
--repo_id lerobot/pusht_image \
--new_repo_id lerobot/pusht_video \
--operation.type convert_image_to_video \
--push_to_hub true
Show dataset information:
lerobot-edit-dataset \
--repo_id lerobot/pusht_image \
--operation.type info \
--operation.show_features true
Show dataset information without feature details:
lerobot-edit-dataset \
--repo_id lerobot/pusht_image \
--operation.type info \
--operation.show_features false
Using JSON config file:
python -m lerobot.scripts.lerobot_edit_dataset \
lerobot-edit-dataset \
--config_path path/to/edit_config.json
"""
import abc
import logging
import shutil
import sys
from dataclasses import dataclass
from pathlib import Path
import draccus
from lerobot.configs import parser
from lerobot.datasets.dataset_tools import (
convert_image_to_video_dataset,
@@ -129,39 +145,46 @@ from lerobot.utils.utils import init_logging
@dataclass
class DeleteEpisodesConfig:
type: str = "delete_episodes"
class OperationConfig(draccus.ChoiceRegistry, abc.ABC):
@property
def type(self) -> str:
return self.get_choice_name(self.__class__)
@OperationConfig.register_subclass("delete_episodes")
@dataclass
class DeleteEpisodesConfig(OperationConfig):
episode_indices: list[int] | None = None
@OperationConfig.register_subclass("split")
@dataclass
class SplitConfig:
type: str = "split"
class SplitConfig(OperationConfig):
splits: dict[str, float | list[int]] | None = None
@OperationConfig.register_subclass("merge")
@dataclass
class MergeConfig:
type: str = "merge"
class MergeConfig(OperationConfig):
repo_ids: list[str] | None = None
@OperationConfig.register_subclass("remove_feature")
@dataclass
class RemoveFeatureConfig:
type: str = "remove_feature"
class RemoveFeatureConfig(OperationConfig):
feature_names: list[str] | None = None
@OperationConfig.register_subclass("modify_tasks")
@dataclass
class ModifyTasksConfig:
type: str = "modify_tasks"
class ModifyTasksConfig(OperationConfig):
new_task: str | None = None
episode_tasks: dict[str, str] | None = None
@OperationConfig.register_subclass("convert_image_to_video")
@dataclass
class ConvertImageToVideoConfig:
type: str = "convert_image_to_video"
class ConvertImageToVideoConfig(OperationConfig):
output_dir: str | None = None
vcodec: str = "libsvtav1"
pix_fmt: str = "yuv420p"
@@ -174,17 +197,17 @@ class ConvertImageToVideoConfig:
max_frames_per_batch: int | None = None
@OperationConfig.register_subclass("info")
@dataclass
class InfoConfig(OperationConfig):
type: str = "info"
show_features: bool = False
@dataclass
class EditDatasetConfig:
repo_id: str
operation: (
DeleteEpisodesConfig
| SplitConfig
| MergeConfig
| RemoveFeatureConfig
| ModifyTasksConfig
| ConvertImageToVideoConfig
)
operation: OperationConfig
root: str | None = None
new_repo_id: str | None = None
push_to_hub: bool = False
@@ -433,6 +456,49 @@ def handle_convert_image_to_video(cfg: EditDatasetConfig) -> None:
logging.info("Dataset saved locally (not pushed to hub)")
def _get_dataset_size(repo_path):
import os
total = 0
with os.scandir(repo_path) as it:
for entry in it:
if entry.is_file():
total += entry.stat().st_size
elif entry.is_dir():
total += _get_dataset_size(entry.path)
return total
def handle_info(cfg: EditDatasetConfig):
if not isinstance(cfg.operation, InfoConfig):
raise ValueError("Operation config must be InfoConfig")
dataset = LeRobotDataset(cfg.repo_id, root=cfg.root)
sys.stdout.write(f"======Info {dataset.meta.repo_id}\n")
sys.stdout.write(f"Repository ID: {dataset.meta.repo_id} \n")
sys.stdout.write(f"Total episode: {dataset.meta.total_episodes} \n")
sys.stdout.write(f"Total task: {dataset.meta.total_tasks} \n")
sys.stdout.write(f"Total frame(Actual Count): {dataset.meta.total_frames}({len(dataset)}) \n")
sys.stdout.write(
f"Average frame per episode: {dataset.meta.total_frames / dataset.meta.total_episodes:.1f}\n"
)
sys.stdout.write(
f"Average episode time(sec): {(dataset.meta.total_frames / dataset.meta.total_episodes) / dataset.meta.fps:.1f}\n"
)
sys.stdout.write(f"FPS: {dataset.meta.fps}\n")
total_file_size = _get_dataset_size(dataset.root)
sys.stdout.write(f"Size: {total_file_size / (1024 * 1024):.1f} MB\n")
if cfg.operation.show_features:
import json
feature_dump_str = json.dumps(
dataset.meta.features, ensure_ascii=False, indent=4, sort_keys=True, separators=(",", ": ")
)
sys.stdout.write("Features:\n")
sys.stdout.write(f"{feature_dump_str}\n")
@parser.wrap()
def edit_dataset(cfg: EditDatasetConfig) -> None:
operation_type = cfg.operation.type
@@ -449,11 +515,11 @@ def edit_dataset(cfg: EditDatasetConfig) -> None:
handle_modify_tasks(cfg)
elif operation_type == "convert_image_to_video":
handle_convert_image_to_video(cfg)
elif operation_type == "info":
handle_info(cfg)
else:
raise ValueError(
f"Unknown operation type: {operation_type}\n"
f"Available operations: delete_episodes, split, merge, remove_feature, modify_tasks, convert_image_to_video"
)
available = ", ".join(OperationConfig.get_known_choices())
raise ValueError(f"Unknown operation: {operation_type}\nAvailable operations: {available}")
def main() -> None:
+12
View File
@@ -43,6 +43,17 @@ lerobot-eval \
Note that in both examples, the repo/folder should contain at least `config.json` and `model.safetensors` files.
You can also evaluate a model on a Hub environment with custom kwargs:
```
lerobot-eval \
--policy.path=HF_USER/HF_REPO \
--env=HF_USER/HF_REPO \
--eval.batch_size=1 \
--eval.n_episodes=10 \
--env_kwargs.environment=env_A \
--env_kwargs.embodiment=emb_B \
```
You can learn about the CLI options for this script in the `EvalPipelineConfig` in lerobot/configs/eval.py
"""
@@ -521,6 +532,7 @@ def eval_main(cfg: EvalPipelineConfig):
n_envs=cfg.eval.batch_size,
use_async_envs=cfg.eval.use_async_envs,
trust_remote_code=cfg.trust_remote_code,
**cfg.env_kwargs,
)
logging.info("Making policy.")
+38 -5
View File
@@ -26,8 +26,10 @@ lerobot-record \
--dataset.repo_id=<my_username>/<my_dataset_name> \
--dataset.num_episodes=2 \
--dataset.single_task="Grab the cube" \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
--display_data=true
# <- Optional: specify video codec (h264, hevc, libsvtav1). Default is libsvtav1. \
# <- Optional: specify video codec (auto, h264, hevc, libsvtav1). Default is libsvtav1. \
# --dataset.vcodec=h264 \
# <- Teleop optional if you want to teleoperate to record or in between episodes with a policy \
# --teleop.type=so100_leader \
@@ -58,7 +60,10 @@ lerobot-record \
--display_data=true \
--dataset.repo_id=${HF_USER}/bimanual-so-handover-cube \
--dataset.num_episodes=25 \
--dataset.single_task="Grab and handover the red cube to the other arm"
--dataset.single_task="Grab and handover the red cube to the other arm" \
--dataset.streaming_encoding=true \
# --dataset.vcodec=auto \
--dataset.encoder_threads=2
```
"""
@@ -179,9 +184,19 @@ class DatasetRecordConfig:
# Number of episodes to record before batch encoding videos
# Set to 1 for immediate encoding (default behavior), or higher for batched encoding
video_encoding_batch_size: int = 1
# Video codec for encoding videos. Options: 'h264', 'hevc', 'libsvtav1'.
# Use 'h264' for faster encoding on systems where AV1 encoding is CPU-heavy.
# Video codec for encoding videos. Options: 'h264', 'hevc', 'libsvtav1', 'auto',
# or hardware-specific: 'h264_videotoolbox', 'h264_nvenc', 'h264_vaapi', 'h264_qsv'.
# Use 'auto' to auto-detect the best available hardware encoder.
vcodec: str = "libsvtav1"
# Enable streaming video encoding: encode frames in real-time during capture instead
# of writing PNG images first. Makes save_episode() near-instant. More info in the documentation: https://huggingface.co/docs/lerobot/streaming_video_encoding
streaming_encoding: bool = False
# Maximum number of frames to buffer per camera when using streaming encoding.
# ~1s buffer at 30fps. Provides backpressure if the encoder can't keep up.
encoder_queue_maxsize: int = 30
# Number of threads per encoder instance. None = auto (codec default).
# Lower values reduce CPU usage, maps to 'lp' (via svtav1-params) for libsvtav1 and 'threads' for h264/hevc..
encoder_threads: int | None = None
# Rename map for the observation to override the image and state keys
rename_map: dict[str, str] = field(default_factory=dict)
@@ -398,7 +413,14 @@ def record_loop(
)
dt_s = time.perf_counter() - start_loop_t
precise_sleep(max(1 / fps - dt_s, 0.0))
sleep_time_s: float = 1 / fps - dt_s
if sleep_time_s < 0:
logging.warning(
f"Record loop is running slower ({1 / dt_s:.1f} Hz) than the target FPS ({fps} Hz). Dataset frames might be dropped and robot control might be unstable. Common causes are: 1) Camera FPS not keeping up 2) Policy inference taking too long 3) CPU starvation"
)
precise_sleep(max(sleep_time_s, 0.0))
timestamp = time.perf_counter() - start_episode_t
@@ -445,6 +467,9 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
root=cfg.dataset.root,
batch_encoding_size=cfg.dataset.video_encoding_batch_size,
vcodec=cfg.dataset.vcodec,
streaming_encoding=cfg.dataset.streaming_encoding,
encoder_queue_maxsize=cfg.dataset.encoder_queue_maxsize,
encoder_threads=cfg.dataset.encoder_threads,
)
if hasattr(robot, "cameras") and len(robot.cameras) > 0:
@@ -467,6 +492,9 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
image_writer_threads=cfg.dataset.num_image_writer_threads_per_camera * len(robot.cameras),
batch_encoding_size=cfg.dataset.video_encoding_batch_size,
vcodec=cfg.dataset.vcodec,
streaming_encoding=cfg.dataset.streaming_encoding,
encoder_queue_maxsize=cfg.dataset.encoder_queue_maxsize,
encoder_threads=cfg.dataset.encoder_threads,
)
# Load pretrained policy
@@ -490,6 +518,11 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
listener, events = init_keyboard_listener()
if not cfg.dataset.streaming_encoding:
logging.info(
"Streaming encoding is disabled. If you have capable hardware, consider enabling it for way faster episode saving. --dataset.streaming_encoding=true --dataset.encoder_threads=2 # --dataset.vcodec=auto. More info in the documentation: https://huggingface.co/docs/lerobot/streaming_video_encoding"
)
with VideoEncodingManager(dataset):
recorded_episodes = 0
while recorded_episodes < cfg.dataset.num_episodes and not events["stop_recording"]:

Some files were not shown because too many files have changed in this diff Show More