Compare commits

..

18 Commits

Author SHA1 Message Date
Michel Aractingi e5e1c97a6c pi0 hack 2025-10-30 14:17:36 +01:00
Michel Aractingi 1594ae60a7 * Change Diffusion policy to use chunk_size notation instead of horizon to standerize the variable names across policies
* reshape noise after taking it as output of the network
2025-10-29 15:22:27 +01:00
Michel Aractingi 7cd710857d update factory with dsrl 2025-10-13 16:12:39 +02:00
Michel Aractingi 5c9bfd57ec Add dsrl policy files 2025-10-13 15:45:16 +02:00
Michel Aractingi f2ff370459 Incremental parquet writing (#1903)
* incremental parquet writing

* add .finalise() and a backup __del__ for stopping writers

* fix missing import

* precommit fixes added back the use of embed images

* added lazy loading for hf_Dataset to avoid frequently reloading the dataset during recording

* fix bug in video timestamps

* Added proper closing of parquet file before reading

* Added rigorous testing to validate the consistency of the meta data after creation of a new dataset

* fix bug in episode index during clear_episode_buffer

* fix(empty concat): check for empty paths list before data files concatenation

* fix(v3.0 message): updating v3.0 backward compatibility message.

* added fixes for the resume logic

* answering co-pilot review

* reverting some changes and style nits

* removed unused functions

* fix chunk_id and file_id when resuming

* - fix parquet loading when resuming
- add test to verify the parquet file integrity when resuming so that data files are now overwritten

* added general function get_file_size_in_mb and removed the one for video

* fix table size value when resuming

* Remove unnecessary reloading of the parquet file when resuming record.
Write to a new parquet file when resuming record

* added back reading parquet file for image datasets only

* - respond to Qlhoest comments
- Use pyarrows `from_pydict` function
- Add buffer for episode metadata to write to the parquet file in batches to improve efficiency
- Remove the  use of `to_parquet_with_hf_images`

* fix(dataset_tools) with the new logic using proper finalize
bug in finding the latest path of the metdata that was pointing to the data files
added check for the metadata size in the case the metadatabuffer was not written yet

* nit in flush_metadata_buffer

* fix(lerobot_dataset) return the right dataset len when a subset of the dataset is requested

---------

Co-authored-by: Harsimrat Sandhawalia <hs.sandhawalia@gmail.com>
2025-10-11 11:01:30 +02:00
Juan Pizarro 25f60c301b use TeleopEvents.RERECORD_EPISODE in gym_manipulator (#2165)
Co-authored-by: Michel Aractingi <michel.aractingi@huggingface.co>
2025-10-11 00:15:42 +02:00
Jade Choghari 0699b46d87 refactor(envs): add custom-observation-size (#2167) 2025-10-10 20:41:37 +02:00
Michel Aractingi b8f7e401d4 Dataset tools (#2100)
* feat(dataset-tools): add dataset utilities and example script

- Introduced dataset tools for LeRobotDataset, including functions for deleting episodes, splitting datasets, adding/removing features, and merging datasets.
- Added an example script demonstrating the usage of these utilities.
- Implemented comprehensive tests for all new functionalities to ensure reliability and correctness.

* style fixes

* move example to dataset dir

* missing lisence

* fixes mostly path

* clean comments

* move tests to functions instead of class based

* - fix video editting, decode, delete frames and rencode video
- copy unchanged video and parquet files to avoid recreating the entire dataset

* Fortify tooling tests

* Fix type issue resulting from saving numpy arrays with shape 3,1,1

* added lerobot_edit_dataset

* - revert changes in examples
- remove hardcoded split names

* update comment

* fix comment
add lerobot-edit-dataset shortcut

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Michel Aractingi <michel.aractingi@huggingface.co>

* style nit after copilot review

* fix: bug in dataset root when editing the dataset in place (without setting new_repo_id

* Fix bug in aggregate.py when accumelating video timestamps; add tests to fortify aggregate videos

* Added missing output repo id

* migrate delete episode to using pyav instead of decoding, writing frames to disk and encoding again.
Co-authored-by: Caroline Pascal <caroline8.pascal@gmail.com>

* added modified suffix in case repo_id is not set in delete_episode

* adding docs for dataset tools

* bump av version and add back time_base assignment

* linter

* modified push_to_hub logic in lerobot_edit_dataset

* fix(progress bar): fixing the progress bar issue in dataset tools

* chore(concatenate): removing no longer needed concatenate_datasets usage

* fix(file sizes forwarding): forwarding files and chunk sizes in metadata info when splitting and aggregating datasets

* style fix

* refactor(aggregate): Fix video indexing and timestamp bugs in dataset merging

There were three critical bugs in aggregate.py that prevented correct dataset merging:

1. Video file indices: Changed from += to = assignment to correctly reference
   merged video files

2. Video timestamps: Implemented per-source-file offset tracking to maintain
   continuous timestamps when merging split datasets (was causing non-monotonic
   timestamp warnings)

3. File rotation offsets: Store timestamp offsets after rotation decision to
   prevent out-of-bounds frame access (was causing "Invalid frame index" errors
   with small file size limits)

Changes:
- Updated update_meta_data() to apply per-source-file timestamp offsets
- Updated aggregate_videos() to track offsets correctly during file rotation
- Added get_video_duration_in_s import for duration calculation

* Improved docs for split dataset and added a check for the possible case that the split size results in zero episodes

* chore(docs): update merge documentation details

Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>

---------

Co-authored-by: CarolinePascal <caroline8.pascal@gmail.com>
Co-authored-by: Jack Vial <vialjack@gmail.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2025-10-10 12:32:07 +02:00
Pepijn 656fc0f059 Remove validate_robot_cameras_for_policy (#2150)
* Remove validate_robot_cameras_for_policy as with rename processor the image keys can be renamed an mapped

* fix precommit
2025-10-10 11:34:21 +02:00
Steven Palma 829d2d1ad9 fic(docs): local docs links (#2149) 2025-10-09 15:20:07 +02:00
Pepijn 4ccf28437a Add act documentation (#2139)
* Add act documentation

* remove citation as we link the paper

* simplify docs

* fix pre commit
2025-10-08 20:07:14 +02:00
Steven Palma 9a49e57c72 refactor(datasets): add compress_level parameter to write_image() and set it to 1 (#2135)
* refactor(datasets): add compress_level parameter to write_image() and set it to 1

* docs(dataset): add docs to write_image()
2025-10-08 20:06:56 +02:00
Steven Palma 6c28ef894a chore(docs): add missing license headers (#2140) 2025-10-08 14:27:52 +02:00
Steven Palma bf3c8746b7 feat(devices): add lazy loading for 3rd party robots cameras and teleoperators (#2123)
* feat(devices): add lazy loading for 3rd party robots cameras and teleoperators

Co-authored-by: Darko Lukić <lukicdarkoo@gmail.com>

* feat(devices): load device class based on assumptions in naming

* docs(devices): instructions for using 3rd party devices

* docs: address review feedback

* chore(docs): add example for 3rd party devices

---------

Co-authored-by: Darko Lukić <lukicdarkoo@gmail.com>
2025-10-07 17:46:22 +02:00
Pepijn 9f32e00f90 fix(async): Add pre and post processing to async inference and update docs (#2132)
* Add pre and post processing to async inference and update docs

* precommit fix typo

* fix tests

* refactor(async): no None branching for processors in _predict_action_chunk

---------

Co-authored-by: Steven Palma <steven.palma@huggingface.co>
2025-10-07 15:10:31 +02:00
Michel Aractingi fcaa0ea5f9 remove extra time base set. (#2133)
Co-authored-by: CarolinePascal <caroline8.pascal@gmail.com>
2025-10-07 14:09:36 +02:00
Iulia Feroli 5ac9356135 Update README.md to fix broken link to example notebook for visuals (#2117)
Folder structure of examples seems to have changed with extra `dataset` folder and the notebook has also changed names.

Signed-off-by: Iulia Feroli <iuliaferoli@gmail.com>
Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>
2025-10-07 09:43:32 +02:00
Steven Palma b74e2a6113 feat(deps): ceil dependency versions (#2091) 2025-10-05 17:53:43 +02:00
69 changed files with 5531 additions and 343 deletions
+183
View File
@@ -0,0 +1,183 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This workflow handles full testing with unboud dependencies versions.
name: Unbound Dependency Tests
on:
# Allows running this workflow manually from the Actions tab
workflow_dispatch:
# Run on the 1st and 15th of every month at 09:00 UTC
schedule:
- cron: '0 2 1,15 * *'
permissions:
contents: read
# Sets up the environment variables
env:
UV_VERSION: "0.8.0"
PYTHON_VERSION: "3.10"
DOCKER_IMAGE_NAME: huggingface/lerobot-gpu:unbound
# Ensures that only the latest action is built, canceling older runs.
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
jobs:
# This job runs the E2E tests + pytest with all unbound extras
full-tests:
name: Full Unbound Tests
runs-on: ubuntu-latest
env:
MUJOCO_GL: egl
steps:
- uses: actions/checkout@v4
with:
lfs: true
persist-credentials: false
- name: Install apt dependencies
run: |
sudo apt-get update && sudo apt-get install -y build-essential \
git curl libglib2.0-0 libegl1-mesa-dev ffmpeg libusb-1.0-0-dev \
speech-dispatcher libgeos-dev portaudio19-dev
- name: Setup uv and Python
uses: astral-sh/setup-uv@v6 # zizmor: ignore[unpinned-uses]
with:
enable-cache: true
version: ${{ env.UV_VERSION }}
python-version: ${{ env.PYTHON_VERSION }}
- name: Unbound dependencies
run: |
sed -i 's/,[[:space:]]*<[0-9\.]*//g' pyproject.toml
echo "Dependencies unbound:" && cat pyproject.toml
- name: Install lerobot with all extras
run: uv sync --all-extras
- name: Run pytest (all extras)
run: uv run pytest tests -vv
- name: Run end-to-end tests
run: uv run make test-end-to-end
# This job builds a GPU enabled image for testing
build-and-push-docker:
name: Build and Push Docker
runs-on:
group: aws-general-8-plus
outputs:
image_tag: ${{ env.DOCKER_IMAGE_NAME }}
env:
GITHUB_REF: ${{ github.ref }}
steps:
- name: Install Git LFS
run: |
sudo apt-get update
sudo apt-get install git-lfs
git lfs install
- uses: actions/checkout@v4
with:
lfs: true
persist-credentials: false
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3 # zizmor: ignore[unpinned-uses]
with:
cache-binary: false
- name: Login to Docker Hub
uses: docker/login-action@v3 # zizmor: ignore[unpinned-uses]
with:
username: ${{ secrets.DOCKERHUB_LEROBOT_USERNAME }}
password: ${{ secrets.DOCKERHUB_LEROBOT_PASSWORD }}
- name: Build and push Docker image
uses: docker/build-push-action@v6 # zizmor: ignore[unpinned-uses]
with:
context: .
file: ./docker/Dockerfile.internal
push: true
tags: ${{ env.DOCKER_IMAGE_NAME }}
build-args: |
UNBOUND_DEPS=true
# This job runs pytest with all unbound extras in a GPU enabled host
# It runs everytime a test image is created
gpu-tests:
name: GPU Unbound Tests
needs: [build-and-push-docker]
runs-on:
group: aws-g6-4xlarge-plus
env:
HF_HOME: /home/user_lerobot/.cache/huggingface
HF_LEROBOT_HOME: /home/user_lerobot/.cache/huggingface/lerobot
TORCH_HOME: /home/user_lerobot/.cache/torch
TRITON_CACHE_DIR: /home/user_lerobot/.cache/triton
container:
image: ${{ needs.build-and-push-docker.outputs.image_tag }} # zizmor: ignore[unpinned-images]
options: --gpus all --shm-size "16gb"
credentials:
username: ${{ secrets.DOCKERHUB_LEROBOT_USERNAME }}
password: ${{ secrets.DOCKERHUB_LEROBOT_PASSWORD }}
defaults:
run:
shell: bash
working-directory: /lerobot
steps:
- name: Run pytest on GPU
run: pytest tests -vv
- name: Run end-to-end tests
run: make test-end-to-end
# This job deletes the test image recently created
# It runs everytime after the gpu-tests have finished
delete-unbound-image:
name: Delete Unbound Image
needs: [gpu-tests, build-and-push-docker]
if: always() && needs.build-and-push-docker.result == 'success'
runs-on: ubuntu-latest
steps:
- name: Get Docker Hub Token and Delete Image
# zizmor: ignore[template-injection]
run: |
IMAGE_NAME=$(echo "${{ needs.build-and-push-docker.outputs.image_tag }}" | cut -d':' -f1)
IMAGE_TAG=$(echo "${{ needs.build-and-push-docker.outputs.image_tag }}" | cut -d':' -f2)
echo "Attempting to delete image: $IMAGE_NAME:$IMAGE_TAG"
TOKEN=$(curl -s -H "Content-Type: application/json" \
-X POST \
-d '{"username": "${{ secrets.DOCKERHUB_LEROBOT_USERNAME }}", "password": "${{ secrets.DOCKERHUB_LEROBOT_PASSWORD }}"}' \
https://hub.docker.com/v2/users/login/ | jq -r .token)
if [ "$TOKEN" == "null" ] || [ -z "$TOKEN" ]; then
echo "::error::Failed to get Docker Hub token."
exit 1
fi
HTTP_RESPONSE=$(curl -s -o /dev/null -w "%{http_code}" \
-H "Authorization: JWT ${TOKEN}" \
-X DELETE \
https://hub.docker.com/v2/repositories/${IMAGE_NAME}/tags/${IMAGE_TAG}/)
if [ "$HTTP_RESPONSE" -eq 204 ]; then
echo "Successfully deleted Docker image tag: $IMAGE_NAME:$IMAGE_TAG"
else
echo "::error::Failed to delete Docker image. HTTP status: $HTTP_RESPONSE"
exit 1
fi
+1 -1
View File
@@ -197,7 +197,7 @@ wandb login
### Visualize datasets
Check out [example 1](https://github.com/huggingface/lerobot/blob/main/examples/1_load_lerobot_dataset.py) that illustrates how to use our dataset class which automatically downloads data from the Hugging Face hub.
Check out [example 1](https://github.com/huggingface/lerobot/blob/main/examples/dataset/load_lerobot_dataset.py) that illustrates how to use our dataset class which automatically downloads data from the Hugging Face hub.
You can also locally visualize episodes from a dataset on the hub by executing our script from the command line:
+8
View File
@@ -75,6 +75,14 @@ RUN uv venv --python python${PYTHON_VERSION}
# Install Python dependencies for caching
COPY --chown=user_lerobot:user_lerobot pyproject.toml README.md MANIFEST.in ./
COPY --chown=user_lerobot:user_lerobot src/ src/
ARG UNBOUND_DEPS=false
RUN if [ "$UNBOUND_DEPS" = "true" ]; then \
sed -i 's/,[[:space:]]*<[0-9\.]*//g' pyproject.toml; \
echo "Dependencies unbound:" && cat pyproject.toml; \
fi
RUN uv pip install --no-cache ".[all]"
# Copy the rest of the application source code
+8
View File
@@ -61,6 +61,14 @@ RUN uv venv
# Install Python dependencies for caching
COPY --chown=user_lerobot:user_lerobot pyproject.toml README.md MANIFEST.in ./
COPY --chown=user_lerobot:user_lerobot src/ src/
ARG UNBOUND_DEPS=false
RUN if [ "$UNBOUND_DEPS" = "true" ]; then \
sed -i 's/,[[:space:]]*<[0-9\.]*//g' pyproject.toml; \
echo "Dependencies unbound:" && cat pyproject.toml; \
fi
RUN uv pip install --no-cache ".[all]"
# Copy the rest of the application code
+4
View File
@@ -25,8 +25,12 @@
title: Using LeRobotDataset
- local: porting_datasets_v3
title: Porting Large Datasets
- local: using_dataset_tools
title: Using the Dataset Tools
title: "Datasets"
- sections:
- local: act
title: ACT
- local: smolvla
title: SmolVLA
- local: pi0
+92
View File
@@ -0,0 +1,92 @@
# ACT (Action Chunking with Transformers)
ACT is a **lightweight and efficient policy for imitation learning**, especially well-suited for fine-grained manipulation tasks. It's the **first model we recommend when you're starting out** with LeRobot due to its fast training time, low computational requirements, and strong performance.
<div class="video-container">
<iframe
width="100%"
height="415"
src="https://www.youtube.com/embed/ft73x0LfGpM"
title="LeRobot ACT Tutorial"
frameborder="0"
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture"
allowfullscreen
></iframe>
</div>
_Watch this tutorial from the LeRobot team to learn how ACT works: [LeRobot ACT Tutorial](https://www.youtube.com/watch?v=ft73x0LfGpM)_
## Model Overview
Action Chunking with Transformers (ACT) was introduced in the paper [Learning Fine-Grained Bimanual Manipulation with Low-Cost Hardware](https://arxiv.org/abs/2304.13705) by Zhao et al. The policy was designed to enable precise, contact-rich manipulation tasks using affordable hardware and minimal demonstration data.
### Why ACT is Great for Beginners
ACT stands out as an excellent starting point for several reasons:
- **Fast Training**: Trains in a few hours on a single GPU
- **Lightweight**: Only ~80M parameters, making it efficient and easy to work with
- **Data Efficient**: Often achieves high success rates with just 50 demonstrations
### Architecture
ACT uses a transformer-based architecture with three main components:
1. **Vision Backbone**: ResNet-18 processes images from multiple camera viewpoints
2. **Transformer Encoder**: Synthesizes information from camera features, joint positions, and a learned latent variable
3. **Transformer Decoder**: Generates coherent action sequences using cross-attention
The policy takes as input:
- Multiple RGB images (e.g., from wrist cameras, front/top cameras)
- Current robot joint positions
- A latent style variable `z` (learned during training, set to zero during inference)
And outputs a chunk of `k` future action sequences.
## Installation Requirements
1. Install LeRobot by following our [Installation Guide](./installation).
2. ACT is included in the base LeRobot installation, so no additional dependencies are needed!
## Training ACT
ACT works seamlessly with the standard LeRobot training pipeline. Here's a complete example for training ACT on your dataset:
```bash
lerobot-train \
--dataset.repo_id=${HF_USER}/your_dataset \
--policy.type=act \
--output_dir=outputs/train/act_your_dataset \
--job_name=act_your_dataset \
--policy.device=cuda \
--wandb.enable=true \
--policy.repo_id=${HF_USER}/act_policy
```
### Training Tips
1. **Start with defaults**: ACT's default hyperparameters work well for most tasks
2. **Training duration**: Expect a few hours for 100k training steps on a single GPU
3. **Batch size**: Start with batch size 8 and adjust based on your GPU memory
### Train using Google Colab
If your local computer doesn't have a powerful GPU, you can utilize Google Colab to train your model by following the [ACT training notebook](./notebooks#training-act).
## Evaluating ACT
Once training is complete, you can evaluate your ACT policy using the `lerobot-record` command with your trained policy. This will run inference and record evaluation episodes:
```bash
lerobot-record \
--robot.type=so100_follower \
--robot.port=/dev/ttyACM0 \
--robot.id=my_robot \
--robot.cameras="{ front: {type: opencv, index_or_path: 0, width: 640, height: 480, fps: 30}}" \
--display_data=true \
--dataset.repo_id=${HF_USER}/eval_act_your_dataset \
--dataset.num_episodes=10 \
--dataset.single_task="Your task description" \
--policy.path=${HF_USER}/act_policy
```
+8 -8
View File
@@ -31,15 +31,15 @@ Then, spin up a policy server (in one terminal, or in a separate machine) specif
You can spin up a policy server running:
```shell
python src/lerobot/async_inference/policy_server.py \
--host=127.0.0.1 \
--port=8080 \
python -m lerobot.async_inference.policy_server \
--host=127.0.0.1 \
--port=8080
```
This will start a policy server listening on `127.0.0.1:8080` (`localhost`, port 8080). At this stage, the policy server is empty, as all information related to which policy to run and with which parameters are specified during the first handshake with the client. Spin up a client with:
```shell
python src/lerobot/async_inference/robot_client.py \
python -m lerobot.async_inference.robot_client \
--server_address=127.0.0.1:8080 \ # SERVER: the host address and port of the policy server
--robot.type=so100_follower \ # ROBOT: your robot type
--robot.port=/dev/tty.usbmodem585A0076841 \ # ROBOT: your robot port
@@ -113,9 +113,9 @@ As such, spinning up a policy server is as easy as specifying the host address a
<hfoptions id="start_policy_server">
<hfoption id="Command">
```bash
python -m lerobot.scripts.server.policy_server \
--host="localhost" \
--port=8080
python -m lerobot.async_inference.policy_server \
--host=127.0.0.1 \
--port=8080
```
</hfoption>
<hfoption id="API example">
@@ -148,7 +148,7 @@ The `RobotClient` streams observations to the `PolicyServer`, and receives actio
<hfoptions id="start_robot_client">
<hfoption id="Command">
```bash
python src/lerobot/async_inference/robot_client.py \
python -m lerobot.async_inference.robot_client \
--server_address=127.0.0.1:8080 \ # SERVER: the host address and port of the policy server
--robot.type=so100_follower \ # ROBOT: your robot type
--robot.port=/dev/tty.usbmodem585A0076841 \ # ROBOT: your robot port
+130 -2
View File
@@ -8,7 +8,7 @@ To that end, we provide the [`Robot`](https://github.com/huggingface/lerobot/blo
- Your own robot which exposes a communication interface (e.g. serial, CAN, TCP)
- A way to read sensor data and send motor commands programmatically, e.g. manufacturer's SDK or API, or your own protocol implementation.
- LeRobot installed in your environment. Follow our [Installation Guide](./installation.mdx).
- LeRobot installed in your environment. Follow our [Installation Guide](./installation).
## Choose your motors
@@ -65,7 +65,7 @@ class MyCoolRobotConfig(RobotConfig):
```
<!-- prettier-ignore-end -->
[Cameras tutorial](./cameras.mdx) to understand how to detect and add your camera.
[Cameras tutorial](./cameras) to understand how to detect and add your camera.
Next, we'll create our actual robot class which inherits from `Robot`. This abstract class defines a contract you must follow for your robot to be usable with the rest of the LeRobot tools.
@@ -335,6 +335,134 @@ For implementing teleoperation devices, we also provide a [`Teleoperator`](https
The main differences are in the I/O functions: a teleoperator allows you to produce action via `get_action` and can receive feedback actions via `send_feedback`. Feedback could be anything controllable on the teleoperation device that could help the person controlling it understand the consequences of the actions sent. Think motion/force feedback on a leader arm, vibrations on a gamepad controller for example. To implement a teleoperator, you can follow this same tutorial and adapt it for these two methods.
## Using Your Own `LeRobot` Devices 🔌
You can easily extend `lerobot` with your own custom hardware—be it a camera, robot, or teleoperation device—by creating a separate, installable Python package. If you follow a few simple conventions, the `lerobot` command-line tools (like `lerobot-teleop` and `lerobot-record`) will **automatically discover and integrate your creations** without requiring any changes to the `lerobot` source code.
This guide outlines the conventions your plugin must follow.
### The 4 Core Conventions
To ensure your custom device is discoverable, you must adhere to the following four rules.
#### 1\. Create an Installable Package with a Specific Prefix
Your project must be a standard, installable Python package. Crucially, the name of your package (as defined in `pyproject.toml` or `setup.py`) must begin with one of these prefixes:
- `lerobot_robot_` for a robot.
- `lerobot_camera_` for a camera.
- `lerobot_teleoperator_` for a teleoperation device.
This prefix system is how `lerobot` automatically finds your plugin in the Python environment.
#### 2\. Follow the `SomethingConfig`/`Something` Naming Pattern
Your device's implementation class must be named after its configuration class, simply by removing the `Config` suffix.
- **Config Class:** `MyAwesomeTeleopConfig`
- **Device Class:** `MyAwesomeTeleop`
#### 3\. Place Your Files in a Predictable Structure
The device class (`MyAwesomeTeleop`) must be located in a predictable module relative to its configuration class (`MyAwesomeTeleopConfig`). `lerobot` will automatically search in these locations:
- In the **same module** as the config class.
- In a **submodule named after the device** (e.g., `my_awesome_teleop.py`).
The recommended and simplest structure is to place them in separate, clearly named files within the same directory.
#### 4\. Expose Classes in `__init__.py`
Your package's `__init__.py` file should import and expose both the configuration and the device classes, making them easily accessible.
### Putting It All Together: A Complete Example
Let's create a new teleoperator called `my_awesome_teleop`.
#### Directory Structure
Here is what the project folder should look like. The package name, `lerobot_teleoperator_my_awesome_teleop`, follows **Convention \#1**.
```
lerobot_teleoperator_my_awesome_teleop/
├── pyproject.toml # (or setup.py) lists lerobot as a dependency
└── lerobot_teleoperator_my_awesome_teleop/
├── __init__.py
├── config_my_awesome_teleop.py
└── my_awesome_teleop.py
```
#### File Contents
- **`config_my_awesome_teleop.py`**: Defines the configuration class. Note the `Config` suffix (**Convention \#2**).
```python
from dataclasses import dataclass
from lerobot.teleoperators.config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("my_awesome_teleop")
@dataclass
class MyAwesomeTeleopConfig(TeleoperatorConfig):
# Your configuration fields go here
port: str = "192.168.1.1"
```
- **`my_awesome_teleop.py`**: Implements the device. The class name `MyAwesomeTeleop` matches its config class name (**Convention \#2**). This file structure adheres to **Convention \#3**.
```python
from lerobot.teleoperators.teleoperator import Teleoperator
from .config_my_awesome_teleop import MyAwesomeTeleopConfig
class MyAwesomeTeleop(Teleoperator):
config_class = MyAwesomeTeleopConfig
name = "my_awesome_teleop"
def __init__(self, config: MyAwesomeTeleopConfig):
super().__init__(config)
self.config = config
# Your device logic (e.g., connect) goes here
```
- **`__init__.py`**: Exposes the key classes (**Convention \#4**).
```python
from .config_my_awesome_teleop import MyAwesomeTeleopConfig
from .my_awesome_teleop import MyAwesomeTeleop
```
### Installation and Usage
1. **Install your new plugin in your Python environment.** You can install your local plugin package using `pip`'s editable mode or from PyPi.
```bash
# Locally
# Navigate to your plugin's root directory and install it
cd lerobot_teleoperator_my_awesome_teleop
pip install -e .
# From PyPi
pip install lerobot_teleoperator_my_awesome_teleop
```
2. **Use it directly from the command line.** Now, you can use your custom device by referencing its type.
```bash
lerobot-teleoperate --teleop.type=my_awesome_teleop \
# other arguments
```
And that's it\! Your custom device is now fully integrated.
### Looking for an example ?
Check out these two packages from the community:
- https://github.com/SpesRobotics/lerobot-robot-xarm
- https://github.com/SpesRobotics/lerobot-teleoperator-teleop
## Wrapping Up
Once your robot class is complete, you can leverage the LeRobot ecosystem:
+3 -3
View File
@@ -297,9 +297,9 @@ LeRobot provides many registered processor steps. Here are the most commonly use
### Next Steps
- **[Implement Your Own Processor](implement_your_own_processor.mdx)** - Create custom processor steps
- **[Debug Your Pipeline](debug_processor_pipeline.mdx)** - Troubleshoot and optimize pipelines
- **[Processors for Robots and Teleoperators](processors_robots_teleop.mdx)** - Real-world integration patterns
- **[Implement Your Own Processor](./implement_your_own_processor)** - Create custom processor steps
- **[Debug Your Pipeline](./debug_processor_pipeline)** - Troubleshoot and optimize pipelines
- **[Processors for Robots and Teleoperators](./processors_robots_teleop)** - Real-world integration patterns
## Summary
+1 -1
View File
@@ -79,7 +79,7 @@ After running the example:
- Android: after starting the script, open the printed local URL on your phone, tap Start, then press and hold Move.
- iOS: open HEBI Mobile I/O first; B1 enables motion. A3 controls the gripper.
Additionally you can customize mapping or safety limits by editing the processor steps shown in the examples. You can also remap inputs (e.g., use a different analog input) or adapt the pipeline to other robots (e.g., LeKiwi) by modifying the input and kinematics steps. More about this in the [Processors for Robots and Teleoperators](./processors_robots_teleop.mdx) guide.
Additionally you can customize mapping or safety limits by editing the processor steps shown in the examples. You can also remap inputs (e.g., use a different analog input) or adapt the pipeline to other robots (e.g., LeKiwi) by modifying the input and kinematics steps. More about this in the [Processors for Robots and Teleoperators](./processors_robots_teleop) guide.
- Run this example to record a dataset, which saves absolute end effector observations and actions:
+102
View File
@@ -0,0 +1,102 @@
# Using Dataset Tools
This guide covers the dataset tools utilities available in LeRobot for modifying and editing existing datasets.
## Overview
LeRobot provides several utilities for manipulating datasets:
1. **Delete Episodes** - Remove specific episodes from a dataset
2. **Split Dataset** - Divide a dataset into multiple smaller datasets
3. **Merge Datasets** - Combine multiple datasets into one. The datasets must have identical features, and episodes are concatenated in the order specified in `repo_ids`
4. **Add Features** - Add new features to a dataset
5. **Remove Features** - Remove features from a dataset
The core implementation is in `lerobot.datasets.dataset_tools`.
An example script detailing how to use the tools API is available in `examples/dataset/use_dataset_tools.py`.
## Command-Line Tool: lerobot-edit-dataset
`lerobot-edit-dataset` is a command-line script for editing datasets. It can be used to delete episodes, split datasets, merge datasets, add features, and remove features.
Run `lerobot-edit-dataset --help` for more information on the configuration of each operation.
### Usage Examples
#### Delete Episodes
Remove specific episodes from a dataset. This is useful for filtering out undesired data.
```bash
# Delete episodes 0, 2, and 5 (modifies original dataset)
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--operation.type delete_episodes \
--operation.episode_indices "[0, 2, 5]"
# Delete episodes and save to a new dataset (preserves original dataset)
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--new_repo_id lerobot/pusht_after_deletion \
--operation.type delete_episodes \
--operation.episode_indices "[0, 2, 5]"
```
#### Split Dataset
Divide a dataset into multiple subsets.
```bash
# Split by fractions (e.g. 80% train, 20% test, 20% val)
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--operation.type split \
--operation.splits '{"train": 0.8, "test": 0.2, "val": 0.2}'
# Split by specific episode indices
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--operation.type split \
--operation.splits '{"task1": [0, 1, 2, 3], "task2": [4, 5]}'
```
There are no constraints on the split names, they can be determined by the user. Resulting datasets are saved under the repo id with the split name appended, e.g. `lerobot/pusht_train`, `lerobot/pusht_task1`, `lerobot/pusht_task2`.
#### Merge Datasets
Combine multiple datasets into a single dataset.
```bash
# Merge train and validation splits back into one dataset
lerobot-edit-dataset \
--repo_id lerobot/pusht_merged \
--operation.type merge \
--operation.repo_ids "['lerobot/pusht_train', 'lerobot/pusht_val']"
```
#### Remove Features
Remove features from a dataset.
```bash
# Remove a camera feature
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--operation.type remove_feature \
--operation.feature_names "['observation.images.top']"
```
### Push to Hub
Add the `--push_to_hub` flag to any command to automatically upload the resulting dataset to the Hugging Face Hub:
```bash
lerobot-edit-dataset \
--repo_id lerobot/pusht \
--new_repo_id lerobot/pusht_after_deletion \
--operation.type delete_episodes \
--operation.episode_indices "[0, 2, 5]" \
--push_to_hub
```
There is also a tool for adding features to a dataset that is not yet covered in `lerobot-edit-dataset`.
+117
View File
@@ -0,0 +1,117 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Example script demonstrating dataset tools utilities.
This script shows how to:
1. Delete episodes from a dataset
2. Split a dataset into train/val sets
3. Add/remove features
4. Merge datasets
Usage:
python examples/dataset/use_dataset_tools.py
"""
import numpy as np
from lerobot.datasets.dataset_tools import (
add_feature,
delete_episodes,
merge_datasets,
remove_feature,
split_dataset,
)
from lerobot.datasets.lerobot_dataset import LeRobotDataset
def main():
dataset = LeRobotDataset("lerobot/pusht")
print(f"Original dataset: {dataset.meta.total_episodes} episodes, {dataset.meta.total_frames} frames")
print(f"Features: {list(dataset.meta.features.keys())}")
print("\n1. Deleting episodes 0 and 2...")
filtered_dataset = delete_episodes(dataset, episode_indices=[0, 2], repo_id="lerobot/pusht_filtered")
print(f"Filtered dataset: {filtered_dataset.meta.total_episodes} episodes")
print("\n2. Splitting dataset into train/val...")
splits = split_dataset(
dataset,
splits={"train": 0.8, "val": 0.2},
)
print(f"Train split: {splits['train'].meta.total_episodes} episodes")
print(f"Val split: {splits['val'].meta.total_episodes} episodes")
print("\n3. Adding a reward feature...")
reward_values = np.random.randn(dataset.meta.total_frames).astype(np.float32)
dataset_with_reward = add_feature(
dataset,
feature_name="reward",
feature_values=reward_values,
feature_info={
"dtype": "float32",
"shape": (1,),
"names": None,
},
repo_id="lerobot/pusht_with_reward",
)
def compute_success(row_dict, episode_index, frame_index):
episode_length = 10
return float(frame_index >= episode_length - 10)
dataset_with_success = add_feature(
dataset_with_reward,
feature_name="success",
feature_values=compute_success,
feature_info={
"dtype": "float32",
"shape": (1,),
"names": None,
},
repo_id="lerobot/pusht_with_reward_and_success",
)
print(f"New features: {list(dataset_with_success.meta.features.keys())}")
print("\n4. Removing the success feature...")
dataset_cleaned = remove_feature(
dataset_with_success, feature_names="success", repo_id="lerobot/pusht_cleaned"
)
print(f"Features after removal: {list(dataset_cleaned.meta.features.keys())}")
print("\n5. Merging train and val splits back together...")
merged = merge_datasets([splits["train"], splits["val"]], output_repo_id="lerobot/pusht_merged")
print(f"Merged dataset: {merged.meta.total_episodes} episodes")
print("\n6. Complex workflow example...")
if len(dataset.meta.camera_keys) > 1:
camera_to_remove = dataset.meta.camera_keys[0]
print(f"Removing camera: {camera_to_remove}")
dataset_no_cam = remove_feature(
dataset, feature_names=camera_to_remove, repo_id="pusht_no_first_camera"
)
print(f"Remaining cameras: {dataset_no_cam.meta.camera_keys}")
print("\nDone! Check ~/.cache/huggingface/lerobot/ for the created datasets.")
if __name__ == "__main__":
main()
+33 -32
View File
@@ -59,20 +59,20 @@ keywords = ["lerobot", "huggingface", "robotics", "machine learning", "artifici
dependencies = [
# Hugging Face dependencies
"datasets>=4.0.0",
"diffusers>=0.27.2",
"huggingface-hub[hf-transfer,cli]>=0.34.2",
"datasets>=4.0.0,<4.2.0",
"diffusers>=0.27.2,<0.36.0",
"huggingface-hub[hf-transfer,cli]>=0.34.2,<0.36.0",
# Core dependencies
"cmake>=3.29.0.1",
"einops>=0.8.0",
"opencv-python-headless>=4.9.0",
"av>=14.2.0",
"jsonlines>=4.0.0",
"packaging>=24.2",
"pynput>=1.7.7",
"pyserial>=3.5",
"wandb>=0.20.0",
"cmake>=3.29.0.1,<4.2.0",
"einops>=0.8.0,<0.9.0",
"opencv-python-headless>=4.9.0,<4.13.0",
"av>=15.0.0,<16.0.0",
"jsonlines>=4.0.0,<5.0.0",
"packaging>=24.2,<26.0",
"pynput>=1.7.7,<1.9.0",
"pyserial>=3.5,<4.0",
"wandb>=0.20.0,<0.23.0",
"torch>=2.2.1,<2.8.0", # TODO: Bumb dependency
"torchcodec>=0.2.1,<0.6.0; sys_platform != 'win32' and (sys_platform != 'linux' or (platform_machine != 'aarch64' and platform_machine != 'arm64' and platform_machine != 'armv7l')) and (sys_platform != 'darwin' or platform_machine != 'x86_64')", # TODO: Bumb dependency
@@ -92,26 +92,26 @@ dependencies = [
[project.optional-dependencies]
# Common
pygame-dep = ["pygame>=2.5.1"]
placo-dep = ["placo>=0.9.6"]
transformers-dep = ["transformers>=4.53.0"]
pygame-dep = ["pygame>=2.5.1,<2.7.0"]
placo-dep = ["placo>=0.9.6,<0.10.0"]
transformers-dep = ["transformers>=4.53.0,<5.0.0"]
grpcio-dep = ["grpcio==1.73.1", "protobuf==6.31.0"]
# Motors
feetech = ["feetech-servo-sdk>=1.0.0"]
dynamixel = ["dynamixel-sdk>=3.7.31"]
feetech = ["feetech-servo-sdk>=1.0.0,<2.0.0"]
dynamixel = ["dynamixel-sdk>=3.7.31,<3.9.0"]
# Robots
gamepad = ["lerobot[pygame-dep]", "hidapi>=0.14.0"]
gamepad = ["lerobot[pygame-dep]", "hidapi>=0.14.0,<0.15.0"]
hopejr = ["lerobot[feetech]", "lerobot[pygame-dep]"]
lekiwi = ["lerobot[feetech]", "pyzmq>=26.2.1"]
reachy2 = ["reachy2_sdk>=1.0.14"]
lekiwi = ["lerobot[feetech]", "pyzmq>=26.2.1,<28.0.0"]
reachy2 = ["reachy2_sdk>=1.0.14,<1.1.0"]
kinematics = ["lerobot[placo-dep]"]
intelrealsense = [
"pyrealsense2>=2.55.1.6486 ; sys_platform != 'darwin'",
"pyrealsense2-macosx>=2.54 ; sys_platform == 'darwin'",
"pyrealsense2>=2.55.1.6486,<2.57.0 ; sys_platform != 'darwin'",
"pyrealsense2-macosx>=2.54,<2.55.0 ; sys_platform == 'darwin'",
]
phone = ["hebi-py>=2.8.0", "teleop>=0.1.0"]
phone = ["hebi-py>=2.8.0,<2.12.0", "teleop>=0.1.0,<0.2.0"]
# stretch = [
# "hello-robot-stretch-body>=0.7.27 ; sys_platform == 'linux'",
# "pyrender @ git+https://github.com/mmatl/pyrender.git ; sys_platform == 'linux'",
@@ -120,21 +120,21 @@ phone = ["hebi-py>=2.8.0", "teleop>=0.1.0"]
# Policies
pi = ["transformers @ git+https://github.com/huggingface/transformers.git@fix/lerobot_openpi"]
smolvla = ["lerobot[transformers-dep]", "num2words>=0.5.14", "accelerate>=1.7.0", "safetensors>=0.4.3"]
hilserl = ["lerobot[transformers-dep]", "gym-hil>=0.1.11", "lerobot[grpcio-dep]", "lerobot[placo-dep]"]
smolvla = ["lerobot[transformers-dep]", "num2words>=0.5.14,<0.6.0", "accelerate>=1.7.0,<2.0.0", "safetensors>=0.4.3,<1.0.0"]
hilserl = ["lerobot[transformers-dep]", "gym-hil>=0.1.11,<0.2.0", "lerobot[grpcio-dep]", "lerobot[placo-dep]"]
# Features
async = ["lerobot[grpcio-dep]", "matplotlib>=3.10.3"]
async = ["lerobot[grpcio-dep]", "matplotlib>=3.10.3,<4.0.0"]
# Development
dev = ["pre-commit>=3.7.0", "debugpy>=1.8.1", "lerobot[grpcio-dep]", "grpcio-tools==1.73.1"]
test = ["pytest>=8.1.0", "pytest-timeout>=2.4.0", "pytest-cov>=5.0.0", "mock-serial>=0.0.1 ; sys_platform != 'win32'"]
video_benchmark = ["scikit-image>=0.23.2", "pandas>=2.2.2"]
dev = ["pre-commit>=3.7.0,<5.0.0", "debugpy>=1.8.1,<1.9.0", "lerobot[grpcio-dep]", "grpcio-tools==1.73.1"]
test = ["pytest>=8.1.0,<9.0.0", "pytest-timeout>=2.4.0,<3.0.0", "pytest-cov>=5.0.0,<8.0.0", "mock-serial>=0.0.1,<0.1.0 ; sys_platform != 'win32'"]
video_benchmark = ["scikit-image>=0.23.2,<0.26.0", "pandas>=2.2.2,<2.4.0"]
# Simulation
aloha = ["gym-aloha>=0.1.1"]
pusht = ["gym-pusht>=0.1.5", "pymunk>=6.6.0,<7.0.0"] # TODO: Fix pymunk version in gym-pusht instead
xarm = ["gym-xarm>=0.1.1"]
aloha = ["gym-aloha>=0.1.1,<0.2.0"]
pusht = ["gym-pusht>=0.1.5,<0.2.0", "pymunk>=6.6.0,<7.0.0"] # TODO: Fix pymunk version in gym-pusht instead
xarm = ["gym-xarm>=0.1.1,<0.2.0"]
libero = ["lerobot[transformers-dep]", "libero @ git+https://github.com/huggingface/lerobot-libero.git@main#egg=libero"]
@@ -175,6 +175,7 @@ lerobot-dataset-viz="lerobot.scripts.lerobot_dataset_viz:main"
lerobot-info="lerobot.scripts.lerobot_info:main"
lerobot-find-joint-limits="lerobot.scripts.lerobot_find_joint_limits:main"
lerobot-imgtransform-viz="lerobot.scripts.lerobot_imgtransform_viz:main"
lerobot-edit-dataset="lerobot.scripts.lerobot_edit_dataset:main"
# ---------------- Tool Configurations ----------------
[tool.setuptools.packages.find]
-5
View File
@@ -142,11 +142,6 @@ class RobotClientConfig:
default=False, metadata={"help": "Visualize the action queue size"}
)
# Verification configuration
verify_robot_cameras: bool = field(
default=True, metadata={"help": "Verify that the robot cameras match the policy cameras"}
)
@property
def environment_dt(self) -> float:
"""Environment time step, in seconds"""
+1 -1
View File
@@ -26,4 +26,4 @@ DEFAULT_OBS_QUEUE_TIMEOUT = 2
SUPPORTED_POLICIES = ["act", "smolvla", "diffusion", "tdmpc", "vqbet", "pi0", "pi05"]
# TODO: Add all other robots
SUPPORTED_ROBOTS = ["so100_follower", "so101_follower"]
SUPPORTED_ROBOTS = ["so100_follower", "so101_follower", "bi_so100_follower"]
+2 -13
View File
@@ -62,15 +62,6 @@ def visualize_action_queue_size(action_queue_size: list[int]) -> None:
plt.show()
def validate_robot_cameras_for_policy(
lerobot_observation_features: dict[str, dict], policy_image_features: dict[str, PolicyFeature]
) -> None:
image_keys = list(filter(is_image_key, lerobot_observation_features))
assert set(image_keys) == set(policy_image_features.keys()), (
f"Policy image features must match robot cameras! Received {list(policy_image_features.keys())} != {image_keys}"
)
def map_robot_keys_to_lerobot_features(robot: Robot) -> dict[str, dict]:
return hw_to_dataset_features(robot.observation_features, OBS_STR, use_video=False)
@@ -92,11 +83,11 @@ def resize_robot_observation_image(image: torch.tensor, resize_dims: tuple[int,
return resized.squeeze(0)
# TODO(Steven): Consider implementing a pipeline step for this
def raw_observation_to_observation(
raw_observation: RawObservation,
lerobot_features: dict[str, dict],
policy_image_features: dict[str, PolicyFeature],
device: str,
) -> Observation:
observation = {}
@@ -105,9 +96,7 @@ def raw_observation_to_observation(
if isinstance(v, torch.Tensor): # VLAs present natural-language instructions in observations
if "image" in k:
# Policy expects images in shape (B, C, H, W)
observation[k] = prepare_image(v).unsqueeze(0).to(device)
else:
observation[k] = v.to(device)
observation[k] = prepare_image(v).unsqueeze(0)
else:
observation[k] = v
+74 -42
View File
@@ -15,7 +15,7 @@
"""
Example:
```shell
python src/lerobot/async_inference/policy_server.py \
python -m lerobot.async_inference.policy_server \
--host=127.0.0.1 \
--port=8080 \
--fps=30 \
@@ -32,12 +32,17 @@ from concurrent import futures
from dataclasses import asdict
from pprint import pformat
from queue import Empty, Queue
from typing import Any
import draccus
import grpc
import torch
from lerobot.policies.factory import get_policy_class
from lerobot.policies.factory import get_policy_class, make_pre_post_processors
from lerobot.processor import (
PolicyAction,
PolicyProcessorPipeline,
)
from lerobot.transport import (
services_pb2, # type: ignore
services_pb2_grpc, # type: ignore
@@ -82,6 +87,8 @@ class PolicyServer(services_pb2_grpc.AsyncInferenceServicer):
self.lerobot_features = None
self.actions_per_chunk = None
self.policy = None
self.preprocessor: PolicyProcessorPipeline[dict[str, Any], dict[str, Any]] | None = None
self.postprocessor: PolicyProcessorPipeline[PolicyAction, PolicyAction] | None = None
@property
def running(self):
@@ -146,6 +153,16 @@ class PolicyServer(services_pb2_grpc.AsyncInferenceServicer):
start = time.perf_counter()
self.policy = policy_class.from_pretrained(policy_specs.pretrained_name_or_path)
self.policy.to(self.device)
# Load preprocessor and postprocessor, overriding device to match requested device
device_override = {"device": self.device}
self.preprocessor, self.postprocessor = make_pre_post_processors(
self.policy.config,
pretrained_path=policy_specs.pretrained_name_or_path,
preprocessor_overrides={"device_processor": device_override},
postprocessor_overrides={"device_processor": device_override},
)
end = time.perf_counter()
self.logger.info(f"Time taken to put policy on {self.device}: {end - start:.4f} seconds")
@@ -173,7 +190,7 @@ class PolicyServer(services_pb2_grpc.AsyncInferenceServicer):
# Calculate FPS metrics
fps_metrics = self.fps_tracker.calculate_fps_metrics(obs_timestamp)
self.logger.info(
self.logger.debug(
f"Received observation #{obs_timestep} | "
f"Avg FPS: {fps_metrics['avg_fps']:.2f} | " # fps at which observations are received from client
f"Target: {fps_metrics['target_fps']:.2f} | "
@@ -189,7 +206,7 @@ class PolicyServer(services_pb2_grpc.AsyncInferenceServicer):
if not self._enqueue_observation(
timed_observation # wrapping a RawObservation
):
self.logger.info(f"Observation #{obs_timestep} has been filtered out")
self.logger.debug(f"Observation #{obs_timestep} has been filtered out")
return services_pb2.Empty()
@@ -301,23 +318,6 @@ class PolicyServer(services_pb2_grpc.AsyncInferenceServicer):
for i, action in enumerate(action_chunk)
]
def _prepare_observation(self, observation_t: TimedObservation) -> Observation:
"""
Prepare observation, ready for policy inference.
E.g.: To keep observation sampling rate high (and network packet tiny) we send int8 [0,255] images from the
client and then convert them to float32 [0,1] images here, before running inference.
"""
# RawObservation from robot.get_observation() - wrong keys, wrong dtype, wrong image shape
observation: Observation = raw_observation_to_observation(
observation_t.get_observation(),
self.lerobot_features,
self.policy_image_features,
self.device,
)
# processed Observation - right keys, right dtype, right image shape
return observation
def _get_action_chunk(self, observation: dict[str, torch.Tensor]) -> torch.Tensor:
"""Get an action chunk from the policy. The chunk contains only"""
chunk = self.policy.predict_action_chunk(observation)
@@ -327,44 +327,76 @@ class PolicyServer(services_pb2_grpc.AsyncInferenceServicer):
return chunk[:, : self.actions_per_chunk, :]
def _predict_action_chunk(self, observation_t: TimedObservation) -> list[TimedAction]:
"""Predict an action chunk based on an observation"""
inference_starts = time.perf_counter()
"""Predict an action chunk based on an observation.
Pipeline:
1. Convert raw observation to LeRobot format
2. Apply preprocessor (tokenization, normalization, batching, device placement)
3. Run policy inference to get action chunk
4. Apply postprocessor (unnormalization, device movement)
5. Convert to TimedAction list
"""
"""1. Prepare observation"""
start_time = time.perf_counter()
observation = self._prepare_observation(observation_t)
preprocessing_time = time.perf_counter() - start_time
start_prepare = time.perf_counter()
observation: Observation = raw_observation_to_observation(
observation_t.get_observation(),
self.lerobot_features,
self.policy_image_features,
)
prepare_time = time.perf_counter() - start_prepare
"""2. Apply preprocessor"""
start_preprocess = time.perf_counter()
observation = self.preprocessor(observation)
self.last_processed_obs: TimedObservation = observation_t
preprocessing_time = time.perf_counter() - start_preprocess
"""2. Get action chunk"""
start_time = time.perf_counter()
"""3. Get action chunk"""
start_inference = time.perf_counter()
action_tensor = self._get_action_chunk(observation)
inference_time = time.perf_counter() - start_time
inference_time = time.perf_counter() - start_inference
self.logger.info(
f"Preprocessing and inference took {inference_time:.4f}s, action shape: {action_tensor.shape}"
)
"""3. Post-inference processing"""
start_time = time.perf_counter()
# Move to CPU before serializing
action_tensor = action_tensor.cpu().squeeze(0)
"""4. Apply postprocessor"""
# Apply postprocessor (handles unnormalization and device movement)
# Postprocessor expects (B, action_dim) per action, but we have (B, chunk_size, action_dim)
# So we process each action in the chunk individually
start_postprocess = time.perf_counter()
_, chunk_size, _ = action_tensor.shape
# Process each action in the chunk
processed_actions = []
for i in range(chunk_size):
# Extract action at timestep i: (B, action_dim)
single_action = action_tensor[:, i, :]
processed_action = self.postprocessor(single_action)
processed_actions.append(processed_action)
# Stack back to (B, chunk_size, action_dim), then remove batch dim
action_tensor = torch.stack(processed_actions, dim=1).squeeze(0)
self.logger.debug(f"Postprocessed action shape: {action_tensor.shape}")
"""5. Convert to TimedAction list"""
action_chunk = self._time_action_chunk(
observation_t.get_timestamp(), list(action_tensor), observation_t.get_timestep()
)
postprocessing_time = time.perf_counter() - start_time
inference_stops = time.perf_counter()
postprocess_stops = time.perf_counter()
postprocessing_time = postprocess_stops - start_postprocess
self.logger.info(
f"Observation {observation_t.get_timestep()} |"
f"Inference time: {1000 * (inference_stops - inference_starts):.2f}ms"
f"Observation {observation_t.get_timestep()} | "
f"Total time: {1000 * (postprocess_stops - start_prepare):.2f}ms"
)
# full-process latency breakdown for debugging purposes
self.logger.debug(
f"Observation {observation_t.get_timestep()} | "
f"Preprocessing time: {1000 * (preprocessing_time - inference_starts):.2f}ms | "
f"Inference time: {1000 * (inference_time - preprocessing_time):.2f}ms | "
f"Postprocessing time: {1000 * (postprocessing_time - inference_time):.2f}ms | "
f"Total time: {1000 * (postprocessing_time - inference_starts):.2f}ms"
f"Prepare time: {1000 * prepare_time:.2f}ms | "
f"Preprocessing time: {1000 * preprocessing_time:.2f}ms | "
f"Inference time: {1000 * inference_time:.2f}ms | "
f"Postprocessing time: {1000 * postprocessing_time:.2f}ms | "
f"Total time: {1000 * (postprocess_stops - start_prepare):.2f}ms"
)
return action_chunk
+3 -12
View File
@@ -48,10 +48,10 @@ import torch
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401
from lerobot.configs.policies import PreTrainedConfig
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
bi_so100_follower,
koch_follower,
make_robot_from_config,
so100_follower,
@@ -75,7 +75,6 @@ from .helpers import (
TimedObservation,
get_logger,
map_robot_keys_to_lerobot_features,
validate_robot_cameras_for_policy,
visualize_action_queue_size,
)
@@ -97,14 +96,6 @@ class RobotClient:
lerobot_features = map_robot_keys_to_lerobot_features(self.robot)
if config.verify_robot_cameras:
# Load policy config for validation
policy_config = PreTrainedConfig.from_pretrained(config.pretrained_name_or_path)
policy_image_features = policy_config.image_features
# The cameras specified for inference must match the one supported by the policy chosen
validate_robot_cameras_for_policy(lerobot_features, policy_image_features)
# Use environment variable if server_address is not provided in config
self.server_address = config.server_address
@@ -214,7 +205,7 @@ class RobotClient:
)
_ = self.stub.SendObservations(observation_iterator)
obs_timestep = obs.get_timestep()
self.logger.info(f"Sent observation #{obs_timestep} | ")
self.logger.debug(f"Sent observation #{obs_timestep} | ")
return True
@@ -467,7 +458,7 @@ class RobotClient:
if self._ready_to_send_observation():
_captured_observation = self.control_loop_observation(task, verbose)
self.logger.info(f"Control loop (ms): {(time.perf_counter() - control_loop_start) * 1000:.2f}")
self.logger.debug(f"Control loop (ms): {(time.perf_counter() - control_loop_start) * 1000:.2f}")
# Dynamically adjust sleep time to maintain the desired control frequency
time.sleep(max(0, self.config.environment_dt - (time.perf_counter() - control_loop_start)))
+9 -2
View File
@@ -15,15 +15,19 @@
# limitations under the License.
import platform
from typing import cast
from lerobot.utils.import_utils import make_device_from_device_class
from .camera import Camera
from .configs import CameraConfig, Cv2Rotation
def make_cameras_from_configs(camera_configs: dict[str, CameraConfig]) -> dict[str, Camera]:
cameras = {}
cameras: dict[str, Camera] = {}
for key, cfg in camera_configs.items():
# TODO(Steven): Consider just using the make_device_from_device_class for all types
if cfg.type == "opencv":
from .opencv import OpenCVCamera
@@ -40,7 +44,10 @@ def make_cameras_from_configs(camera_configs: dict[str, CameraConfig]) -> dict[s
cameras[key] = Reachy2Camera(cfg)
else:
raise ValueError(f"The camera type '{cfg.type}' is not valid.")
try:
cameras[key] = cast(Camera, make_device_from_device_class(cfg))
except Exception as e:
raise ValueError(f"Error creating camera {key} with config {cfg}: {e}") from e
return cameras
+62 -26
View File
@@ -31,15 +31,15 @@ from lerobot.datasets.utils import (
DEFAULT_EPISODES_PATH,
DEFAULT_VIDEO_FILE_SIZE_IN_MB,
DEFAULT_VIDEO_PATH,
get_file_size_in_mb,
get_parquet_file_size_in_mb,
get_video_size_in_mb,
to_parquet_with_hf_images,
update_chunk_file_indices,
write_info,
write_stats,
write_tasks,
)
from lerobot.datasets.video_utils import concatenate_video_files
from lerobot.datasets.video_utils import concatenate_video_files, get_video_duration_in_s
def validate_all_metadata(all_metadata: list[LeRobotDatasetMetadata]):
@@ -130,10 +130,34 @@ def update_meta_data(
df["data/chunk_index"] = df["data/chunk_index"] + data_idx["chunk"]
df["data/file_index"] = df["data/file_index"] + data_idx["file"]
for key, video_idx in videos_idx.items():
df[f"videos/{key}/chunk_index"] = df[f"videos/{key}/chunk_index"] + video_idx["chunk"]
df[f"videos/{key}/file_index"] = df[f"videos/{key}/file_index"] + video_idx["file"]
df[f"videos/{key}/from_timestamp"] = df[f"videos/{key}/from_timestamp"] + video_idx["latest_duration"]
df[f"videos/{key}/to_timestamp"] = df[f"videos/{key}/to_timestamp"] + video_idx["latest_duration"]
# Store original video file indices before updating
orig_chunk_col = f"videos/{key}/chunk_index"
orig_file_col = f"videos/{key}/file_index"
df["_orig_chunk"] = df[orig_chunk_col].copy()
df["_orig_file"] = df[orig_file_col].copy()
# Update chunk and file indices to point to destination
df[orig_chunk_col] = video_idx["chunk"]
df[orig_file_col] = video_idx["file"]
# Apply per-source-file timestamp offsets
src_to_offset = video_idx.get("src_to_offset", {})
if src_to_offset:
# Apply offset based on original source file
for idx in df.index:
src_key = (df.at[idx, "_orig_chunk"], df.at[idx, "_orig_file"])
offset = src_to_offset.get(src_key, 0)
df.at[idx, f"videos/{key}/from_timestamp"] += offset
df.at[idx, f"videos/{key}/to_timestamp"] += offset
else:
# Fallback to simple offset (for backward compatibility)
df[f"videos/{key}/from_timestamp"] = (
df[f"videos/{key}/from_timestamp"] + video_idx["latest_duration"]
)
df[f"videos/{key}/to_timestamp"] = df[f"videos/{key}/to_timestamp"] + video_idx["latest_duration"]
# Clean up temporary columns
df = df.drop(columns=["_orig_chunk", "_orig_file"])
df["dataset_from_index"] = df["dataset_from_index"] + dst_meta.info["total_frames"]
df["dataset_to_index"] = df["dataset_to_index"] + dst_meta.info["total_frames"]
@@ -193,6 +217,10 @@ def aggregate_datasets(
robot_type=robot_type,
features=features,
root=aggr_root,
use_videos=len(video_keys) > 0,
chunks_size=chunk_size,
data_files_size_in_mb=data_files_size_in_mb,
video_files_size_in_mb=video_files_size_in_mb,
)
logging.info("Find all tasks")
@@ -236,6 +264,11 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu
Returns:
dict: Updated videos_idx with current chunk and file indices.
"""
for key in videos_idx:
videos_idx[key]["episode_duration"] = 0
# Track offset for each source (chunk, file) pair
videos_idx[key]["src_to_offset"] = {}
for key, video_idx in videos_idx.items():
unique_chunk_file_pairs = {
(chunk, file)
@@ -249,6 +282,7 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu
chunk_idx = video_idx["chunk"]
file_idx = video_idx["file"]
current_offset = video_idx["latest_duration"]
for src_chunk_idx, src_file_idx in unique_chunk_file_pairs:
src_path = src_meta.root / DEFAULT_VIDEO_PATH.format(
@@ -263,21 +297,25 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu
file_index=file_idx,
)
# If a new file is created, we don't want to increment the latest_duration
update_latest_duration = False
src_duration = get_video_duration_in_s(src_path)
if not dst_path.exists():
# First write to this destination file
# Store offset before incrementing
videos_idx[key]["src_to_offset"][(src_chunk_idx, src_file_idx)] = current_offset
dst_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(str(src_path), str(dst_path))
continue # not accumulating further, already copied the file in place
videos_idx[key]["episode_duration"] += src_duration
current_offset += src_duration
continue
# Check file sizes before appending
src_size = get_video_size_in_mb(src_path)
dst_size = get_video_size_in_mb(dst_path)
src_size = get_file_size_in_mb(src_path)
dst_size = get_file_size_in_mb(dst_path)
if dst_size + src_size >= video_files_size_in_mb:
# Rotate to a new chunk/file
# Rotate to a new file, this source becomes start of new destination
# So its offset should be 0
videos_idx[key]["src_to_offset"][(src_chunk_idx, src_file_idx)] = 0
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, chunk_size)
dst_path = dst_meta.root / DEFAULT_VIDEO_PATH.format(
video_key=key,
@@ -286,25 +324,22 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu
)
dst_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(str(src_path), str(dst_path))
# Reset offset for next file
current_offset = src_duration
else:
# Get the timestamps shift for this video
timestamps_shift_s = dst_meta.info["total_frames"] / dst_meta.info["fps"]
# Append to existing video file
# Append to existing video file - use current accumulated offset
videos_idx[key]["src_to_offset"][(src_chunk_idx, src_file_idx)] = current_offset
concatenate_video_files(
[dst_path, src_path],
dst_path,
)
# Update the latest_duration when appending (shifts timestamps!)
update_latest_duration = not update_latest_duration
current_offset += src_duration
videos_idx[key]["episode_duration"] += src_duration
# Update the videos_idx with the final chunk and file indices for this key
videos_idx[key]["chunk"] = chunk_idx
videos_idx[key]["file"] = file_idx
if update_latest_duration:
videos_idx[key]["latest_duration"] += timestamps_shift_s
return videos_idx
@@ -389,9 +424,6 @@ def aggregate_metadata(src_meta, dst_meta, meta_idx, data_idx, videos_idx):
videos_idx,
)
for k in videos_idx:
videos_idx[k]["latest_duration"] += videos_idx[k]["episode_duration"]
meta_idx = append_or_create_parquet_file(
df,
src_path,
@@ -403,6 +435,10 @@ def aggregate_metadata(src_meta, dst_meta, meta_idx, data_idx, videos_idx):
aggr_root=dst_meta.root,
)
# Increment latest_duration by the total duration added from this source dataset
for k in videos_idx:
videos_idx[k]["latest_duration"] += videos_idx[k]["episode_duration"]
return meta_idx
File diff suppressed because it is too large Load Diff
+25 -2
View File
@@ -68,7 +68,30 @@ def image_array_to_pil_image(image_array: np.ndarray, range_check: bool = True)
return PIL.Image.fromarray(image_array)
def write_image(image: np.ndarray | PIL.Image.Image, fpath: Path):
def write_image(image: np.ndarray | PIL.Image.Image, fpath: Path, compress_level: int = 1):
"""
Saves a NumPy array or PIL Image to a file.
This function handles both NumPy arrays and PIL Image objects, converting
the former to a PIL Image before saving. It includes error handling for
the save operation.
Args:
image (np.ndarray | PIL.Image.Image): The image data to save.
fpath (Path): The destination file path for the image.
compress_level (int, optional): The compression level for the saved
image, as used by PIL.Image.save(). Defaults to 1.
Refer to: https://github.com/huggingface/lerobot/pull/2135
for more details on the default value rationale.
Raises:
TypeError: If the input 'image' is not a NumPy array or a
PIL.Image.Image object.
Side Effects:
Prints an error message to the console if the image writing process
fails for any reason.
"""
try:
if isinstance(image, np.ndarray):
img = image_array_to_pil_image(image)
@@ -76,7 +99,7 @@ def write_image(image: np.ndarray | PIL.Image.Image, fpath: Path):
img = image
else:
raise TypeError(f"Unsupported image type: {type(image)}")
img.save(fpath)
img.save(fpath, compress_level=compress_level)
except Exception as e:
print(f"Error writing image {fpath}: {e}")
+259 -98
View File
@@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import gc
import logging
import shutil
import tempfile
@@ -26,6 +25,8 @@ import numpy as np
import packaging.version
import pandas as pd
import PIL.Image
import pyarrow as pa
import pyarrow.parquet as pq
import torch
import torch.utils
from huggingface_hub import HfApi, snapshot_download
@@ -46,13 +47,9 @@ from lerobot.datasets.utils import (
embed_images,
flatten_dict,
get_delta_indices,
get_hf_dataset_cache_dir,
get_hf_dataset_size_in_mb,
get_file_size_in_mb,
get_hf_features_from_features,
get_parquet_file_size_in_mb,
get_parquet_num_frames,
get_safe_version,
get_video_size_in_mb,
hf_transform_to_torch,
is_valid_version,
load_episodes,
@@ -60,7 +57,6 @@ from lerobot.datasets.utils import (
load_nested_dataset,
load_stats,
load_tasks,
to_parquet_with_hf_images,
update_chunk_file_indices,
validate_episode_buffer,
validate_frame,
@@ -90,10 +86,15 @@ class LeRobotDatasetMetadata:
root: str | Path | None = None,
revision: str | None = None,
force_cache_sync: bool = False,
metadata_buffer_size: int = 10,
):
self.repo_id = repo_id
self.revision = revision if revision else CODEBASE_VERSION
self.root = Path(root) if root is not None else HF_LEROBOT_HOME / repo_id
self.writer = None
self.latest_episode = None
self.metadata_buffer: list[dict] = []
self.metadata_buffer_size = metadata_buffer_size
try:
if force_cache_sync:
@@ -107,6 +108,54 @@ class LeRobotDatasetMetadata:
self.pull_from_repo(allow_patterns="meta/")
self.load_metadata()
def _flush_metadata_buffer(self) -> None:
"""Write all buffered episode metadata to parquet file."""
if not hasattr(self, "metadata_buffer") or len(self.metadata_buffer) == 0:
return
combined_dict = {}
for episode_dict in self.metadata_buffer:
for key, value in episode_dict.items():
if key not in combined_dict:
combined_dict[key] = []
# Extract value and serialize numpy arrays
# because PyArrow's from_pydict function doesn't support numpy arrays
val = value[0] if isinstance(value, list) else value
combined_dict[key].append(val.tolist() if isinstance(val, np.ndarray) else val)
first_ep = self.metadata_buffer[0]
chunk_idx = first_ep["meta/episodes/chunk_index"][0]
file_idx = first_ep["meta/episodes/file_index"][0]
table = pa.Table.from_pydict(combined_dict)
if not self.writer:
path = Path(self.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx))
path.parent.mkdir(parents=True, exist_ok=True)
self.writer = pq.ParquetWriter(
path, schema=table.schema, compression="snappy", use_dictionary=True
)
self.writer.write_table(table)
self.latest_episode = self.metadata_buffer[-1]
self.metadata_buffer.clear()
def _close_writer(self) -> None:
"""Close and cleanup the parquet writer if it exists."""
self._flush_metadata_buffer()
writer = getattr(self, "writer", None)
if writer is not None:
writer.close()
self.writer = None
def __del__(self):
"""
Trust the user to call .finalize() but as an added safety check call the parquet writer to stop when calling the destructor
"""
self._close_writer()
def load_metadata(self):
self.info = load_info(self.root)
check_version_compatibility(self.repo_id, self._version, CODEBASE_VERSION)
@@ -138,6 +187,12 @@ class LeRobotDatasetMetadata:
return packaging.version.parse(self.info["codebase_version"])
def get_data_file_path(self, ep_index: int) -> Path:
if self.episodes is None:
self.episodes = load_episodes(self.root)
if ep_index >= len(self.episodes):
raise IndexError(
f"Episode index {ep_index} out of range. Episodes: {len(self.episodes) if self.episodes else 0}"
)
ep = self.episodes[ep_index]
chunk_idx = ep["data/chunk_index"]
file_idx = ep["data/file_index"]
@@ -145,6 +200,12 @@ class LeRobotDatasetMetadata:
return Path(fpath)
def get_video_file_path(self, ep_index: int, vid_key: str) -> Path:
if self.episodes is None:
self.episodes = load_episodes(self.root)
if ep_index >= len(self.episodes):
raise IndexError(
f"Episode index {ep_index} out of range. Episodes: {len(self.episodes) if self.episodes else 0}"
)
ep = self.episodes[ep_index]
chunk_idx = ep[f"videos/{vid_key}/chunk_index"]
file_idx = ep[f"videos/{vid_key}/file_index"]
@@ -260,72 +321,75 @@ class LeRobotDatasetMetadata:
write_tasks(self.tasks, self.root)
def _save_episode_metadata(self, episode_dict: dict) -> None:
"""Save episode metadata to a parquet file and update the Hugging Face dataset of episodes metadata.
"""Buffer episode metadata and write to parquet in batches for efficiency.
This function processes episodes metadata from a dictionary, converts it into a Hugging Face dataset,
and saves it as a parquet file. It handles both the creation of new parquet files and the
updating of existing ones based on size constraints. After saving the metadata, it reloads
the Hugging Face dataset to ensure it is up-to-date.
This function accumulates episode metadata in a buffer and flushes it when the buffer
reaches the configured size. This reduces I/O overhead by writing multiple episodes
at once instead of one row at a time.
Notes: We both need to update parquet files and HF dataset:
- `pandas` loads parquet file in RAM
- `datasets` relies on a memory mapping from pyarrow (no RAM). It either converts parquet files to a pyarrow cache on disk,
or loads directly from pyarrow cache.
"""
# Convert buffer into HF Dataset
# Convert to list format for each value
episode_dict = {key: [value] for key, value in episode_dict.items()}
ep_dataset = datasets.Dataset.from_dict(episode_dict)
ep_size_in_mb = get_hf_dataset_size_in_mb(ep_dataset)
df = pd.DataFrame(ep_dataset)
num_frames = episode_dict["length"][0]
if self.episodes is None:
if self.latest_episode is None:
# Initialize indices and frame count for a new dataset made of the first episode data
chunk_idx, file_idx = 0, 0
df["meta/episodes/chunk_index"] = [chunk_idx]
df["meta/episodes/file_index"] = [file_idx]
df["dataset_from_index"] = [0]
df["dataset_to_index"] = [num_frames]
else:
# Retrieve information from the latest parquet file
latest_ep = self.episodes[-1]
chunk_idx = latest_ep["meta/episodes/chunk_index"]
file_idx = latest_ep["meta/episodes/file_index"]
if self.episodes is not None and len(self.episodes) > 0:
# It means we are resuming recording, so we need to load the latest episode
# Update the indices to avoid overwriting the latest episode
chunk_idx = self.episodes[-1]["meta/episodes/chunk_index"]
file_idx = self.episodes[-1]["meta/episodes/file_index"]
latest_num_frames = self.episodes[-1]["dataset_to_index"]
episode_dict["dataset_from_index"] = [latest_num_frames]
episode_dict["dataset_to_index"] = [latest_num_frames + num_frames]
latest_path = self.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
latest_size_in_mb = get_parquet_file_size_in_mb(latest_path)
if latest_size_in_mb + ep_size_in_mb >= self.data_files_size_in_mb:
# Size limit is reached, prepare new parquet file
# When resuming, move to the next file
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, self.chunks_size)
else:
episode_dict["dataset_from_index"] = [0]
episode_dict["dataset_to_index"] = [num_frames]
episode_dict["meta/episodes/chunk_index"] = [chunk_idx]
episode_dict["meta/episodes/file_index"] = [file_idx]
else:
chunk_idx = self.latest_episode["meta/episodes/chunk_index"][0]
file_idx = self.latest_episode["meta/episodes/file_index"][0]
latest_path = (
self.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
if self.writer is None
else self.writer.where
)
if Path(latest_path).exists():
latest_size_in_mb = get_file_size_in_mb(Path(latest_path))
latest_num_frames = self.latest_episode["episode_index"][0]
av_size_per_frame = latest_size_in_mb / latest_num_frames if latest_num_frames > 0 else 0.0
if latest_size_in_mb + av_size_per_frame * num_frames >= self.data_files_size_in_mb:
# Size limit is reached, flush buffer and prepare new parquet file
self._flush_metadata_buffer()
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, self.chunks_size)
self._close_writer()
# Update the existing pandas dataframe with new row
df["meta/episodes/chunk_index"] = [chunk_idx]
df["meta/episodes/file_index"] = [file_idx]
df["dataset_from_index"] = [latest_ep["dataset_to_index"]]
df["dataset_to_index"] = [latest_ep["dataset_to_index"] + num_frames]
episode_dict["meta/episodes/chunk_index"] = [chunk_idx]
episode_dict["meta/episodes/file_index"] = [file_idx]
episode_dict["dataset_from_index"] = [self.latest_episode["dataset_to_index"][0]]
episode_dict["dataset_to_index"] = [self.latest_episode["dataset_to_index"][0] + num_frames]
if latest_size_in_mb + ep_size_in_mb < self.data_files_size_in_mb:
# Size limit wasnt reached, concatenate latest dataframe with new one
latest_df = pd.read_parquet(latest_path)
df = pd.concat([latest_df, df], ignore_index=True)
# Add to buffer
self.metadata_buffer.append(episode_dict)
self.latest_episode = episode_dict
# Memort optimization
del latest_df
gc.collect()
# Write the resulting dataframe from RAM to disk
path = self.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(path, index=False)
if self.episodes is not None:
# Remove the episodes cache directory, necessary to avoid cache bloat
cached_dir = get_hf_dataset_cache_dir(self.episodes)
if cached_dir is not None:
shutil.rmtree(cached_dir)
self.episodes = load_episodes(self.root)
if len(self.metadata_buffer) >= self.metadata_buffer_size:
self._flush_metadata_buffer()
def save_episode(
self,
@@ -438,6 +502,10 @@ class LeRobotDatasetMetadata:
robot_type: str | None = None,
root: str | Path | None = None,
use_videos: bool = True,
metadata_buffer_size: int = 10,
chunks_size: int | None = None,
data_files_size_in_mb: int | None = None,
video_files_size_in_mb: int | None = None,
) -> "LeRobotDatasetMetadata":
"""Creates metadata for a LeRobotDataset."""
obj = cls.__new__(cls)
@@ -452,11 +520,24 @@ class LeRobotDatasetMetadata:
obj.tasks = None
obj.episodes = None
obj.stats = None
obj.info = create_empty_dataset_info(CODEBASE_VERSION, fps, features, use_videos, robot_type)
obj.info = create_empty_dataset_info(
CODEBASE_VERSION,
fps,
features,
use_videos,
robot_type,
chunks_size,
data_files_size_in_mb,
video_files_size_in_mb,
)
if len(obj.video_keys) > 0 and not use_videos:
raise ValueError()
write_json(obj.info, obj.root / INFO_PATH)
obj.revision = None
obj.writer = None
obj.latest_episode = None
obj.metadata_buffer = []
obj.metadata_buffer_size = metadata_buffer_size
return obj
@@ -603,6 +684,8 @@ class LeRobotDataset(torch.utils.data.Dataset):
# Unused attributes
self.image_writer = None
self.episode_buffer = None
self.writer = None
self.latest_episode = None
self.root.mkdir(exist_ok=True, parents=True)
@@ -611,6 +694,11 @@ class LeRobotDataset(torch.utils.data.Dataset):
self.repo_id, self.root, self.revision, force_cache_sync=force_cache_sync
)
# Track dataset state for efficient incremental writing
self._lazy_loading = False
self._recorded_frames = self.meta.total_frames
self._writer_closed_for_reading = False
# Load actual data
try:
if force_cache_sync:
@@ -629,6 +717,19 @@ class LeRobotDataset(torch.utils.data.Dataset):
check_delta_timestamps(self.delta_timestamps, self.fps, self.tolerance_s)
self.delta_indices = get_delta_indices(self.delta_timestamps, self.fps)
def _close_writer(self) -> None:
"""Close and cleanup the parquet writer if it exists."""
writer = getattr(self, "writer", None)
if writer is not None:
writer.close()
self.writer = None
def __del__(self):
"""
Trust the user to call .finalize() but as an added safety check call the parquet writer to stop when calling the destructor
"""
self._close_writer()
def push_to_hub(
self,
branch: str | None = None,
@@ -769,8 +870,15 @@ class LeRobotDataset(torch.utils.data.Dataset):
@property
def num_frames(self) -> int:
"""Number of frames in selected episodes."""
return len(self.hf_dataset) if self.hf_dataset is not None else self.meta.total_frames
"""Number of frames in selected episodes.
Note: When episodes a subset of the full dataset is requested, we must return the
actual loaded data length (len(self.hf_dataset)) rather than metadata total_frames.
self.meta.total_frames is the total number of frames in the full dataset.
"""
if self.episodes is not None and self.hf_dataset is not None:
return len(self.hf_dataset)
return self.meta.total_frames
@property
def num_episodes(self) -> int:
@@ -848,10 +956,22 @@ class LeRobotDataset(torch.utils.data.Dataset):
return item
def _ensure_hf_dataset_loaded(self):
"""Lazy load the HF dataset only when needed for reading."""
if self._lazy_loading or self.hf_dataset is None:
# Close the writer before loading to ensure parquet file is properly finalized
if self.writer is not None:
self._close_writer()
self._writer_closed_for_reading = True
self.hf_dataset = self.load_hf_dataset()
self._lazy_loading = False
def __len__(self):
return self.num_frames
def __getitem__(self, idx) -> dict:
# Ensure dataset is loaded when we actually need to read from it
self._ensure_hf_dataset_loaded()
item = self.hf_dataset[idx]
ep_idx = item["episode_index"].item()
@@ -890,6 +1010,14 @@ class LeRobotDataset(torch.utils.data.Dataset):
"})',\n"
)
def finalize(self):
"""
Close the parquet writers. This function needs to be called after data collection/conversion, else footer metadata won't be written to the parquet files.
The dataset won't be valid and can't be loaded as ds = LeRobotDataset(repo_id=repo, root=HF_LEROBOT_HOME.joinpath(repo))
"""
self._close_writer()
self.meta._close_writer()
def create_episode_buffer(self, episode_index: int | None = None) -> dict:
current_ep_idx = self.meta.total_episodes if episode_index is None else episode_index
ep_buffer = {}
@@ -1097,74 +1225,101 @@ class LeRobotDataset(torch.utils.data.Dataset):
ep_dict = {key: episode_buffer[key] for key in self.hf_features}
ep_dataset = datasets.Dataset.from_dict(ep_dict, features=self.hf_features, split="train")
ep_dataset = embed_images(ep_dataset)
ep_size_in_mb = get_hf_dataset_size_in_mb(ep_dataset)
ep_num_frames = len(ep_dataset)
df = pd.DataFrame(ep_dataset)
if self.meta.episodes is None:
if self.latest_episode is None:
# Initialize indices and frame count for a new dataset made of the first episode data
chunk_idx, file_idx = 0, 0
latest_num_frames = 0
global_frame_index = 0
# However, if the episodes already exists
# It means we are resuming recording, so we need to load the latest episode
# Update the indices to avoid overwriting the latest episode
if self.meta.episodes is not None and len(self.meta.episodes) > 0:
latest_ep = self.meta.episodes[-1]
global_frame_index = latest_ep["dataset_to_index"]
chunk_idx = latest_ep["data/chunk_index"]
file_idx = latest_ep["data/file_index"]
# When resuming, move to the next file
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, self.meta.chunks_size)
else:
# Retrieve information from the latest parquet file
latest_ep = self.meta.episodes[-1]
latest_ep = self.latest_episode
chunk_idx = latest_ep["data/chunk_index"]
file_idx = latest_ep["data/file_index"]
global_frame_index = latest_ep["index"][-1] + 1
latest_path = self.root / self.meta.data_path.format(chunk_index=chunk_idx, file_index=file_idx)
latest_size_in_mb = get_parquet_file_size_in_mb(latest_path)
latest_num_frames = get_parquet_num_frames(latest_path)
latest_size_in_mb = get_file_size_in_mb(latest_path)
frames_in_current_file = global_frame_index - latest_ep["dataset_from_index"]
av_size_per_frame = (
latest_size_in_mb / frames_in_current_file if frames_in_current_file > 0 else 0
)
# Determine if a new parquet file is needed
if latest_size_in_mb + ep_size_in_mb >= self.meta.data_files_size_in_mb:
# Size limit is reached, prepare new parquet file
if (
latest_size_in_mb + av_size_per_frame * ep_num_frames >= self.meta.data_files_size_in_mb
or self._writer_closed_for_reading
):
# Size limit is reached or writer was closed for reading, prepare new parquet file
chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, self.meta.chunks_size)
latest_num_frames = 0
else:
# Update the existing parquet file with new rows
latest_df = pd.read_parquet(latest_path)
df = pd.concat([latest_df, df], ignore_index=True)
self._close_writer()
self._writer_closed_for_reading = False
# Memort optimization
del latest_df
gc.collect()
ep_dict["data/chunk_index"] = chunk_idx
ep_dict["data/file_index"] = file_idx
# Write the resulting dataframe from RAM to disk
path = self.root / self.meta.data_path.format(chunk_index=chunk_idx, file_index=file_idx)
path.parent.mkdir(parents=True, exist_ok=True)
if len(self.meta.image_keys) > 0:
to_parquet_with_hf_images(df, path)
else:
df.to_parquet(path)
if self.hf_dataset is not None:
# Remove hf dataset cache directory, necessary to avoid cache bloat
cached_dir = get_hf_dataset_cache_dir(self.hf_dataset)
if cached_dir is not None:
shutil.rmtree(cached_dir)
self.hf_dataset = self.load_hf_dataset()
table = ep_dataset.with_format("arrow")[:]
if not self.writer:
self.writer = pq.ParquetWriter(
path, schema=table.schema, compression="snappy", use_dictionary=True
)
self.writer.write_table(table)
metadata = {
"data/chunk_index": chunk_idx,
"data/file_index": file_idx,
"dataset_from_index": latest_num_frames,
"dataset_to_index": latest_num_frames + ep_num_frames,
"dataset_from_index": global_frame_index,
"dataset_to_index": global_frame_index + ep_num_frames,
}
# Store metadata with episode data for next episode
self.latest_episode = {**ep_dict, **metadata}
# Mark that the HF dataset needs reloading (lazy loading approach)
# This avoids expensive reloading during sequential recording
self._lazy_loading = True
# Update recorded frames count for efficient length tracking
self._recorded_frames += ep_num_frames
return metadata
def _save_episode_video(self, video_key: str, episode_index: int) -> dict:
# Encode episode frames into a temporary video
ep_path = self._encode_temporary_episode_video(video_key, episode_index)
ep_size_in_mb = get_video_size_in_mb(ep_path)
ep_size_in_mb = get_file_size_in_mb(ep_path)
ep_duration_in_s = get_video_duration_in_s(ep_path)
if self.meta.episodes is None or (
f"videos/{video_key}/chunk_index" not in self.meta.episodes.column_names
or f"videos/{video_key}/file_index" not in self.meta.episodes.column_names
if (
episode_index == 0
or self.meta.latest_episode is None
or f"videos/{video_key}/chunk_index" not in self.meta.latest_episode
):
# Initialize indices for a new dataset made of the first episode data
chunk_idx, file_idx = 0, 0
if self.meta.episodes is not None and len(self.meta.episodes) > 0:
# It means we are resuming recording, so we need to load the latest episode
# Update the indices to avoid overwriting the latest episode
old_chunk_idx = self.meta.episodes[-1][f"videos/{video_key}/chunk_index"]
old_file_idx = self.meta.episodes[-1][f"videos/{video_key}/file_index"]
chunk_idx, file_idx = update_chunk_file_indices(
old_chunk_idx, old_file_idx, self.meta.chunks_size
)
latest_duration_in_s = 0.0
new_path = self.root / self.meta.video_path.format(
video_key=video_key, chunk_index=chunk_idx, file_index=file_idx
@@ -1172,16 +1327,16 @@ class LeRobotDataset(torch.utils.data.Dataset):
new_path.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(ep_path), str(new_path))
else:
# Retrieve information from the latest updated video file (possibly several episodes ago)
latest_ep = self.meta.episodes[episode_index - 1]
chunk_idx = latest_ep[f"videos/{video_key}/chunk_index"]
file_idx = latest_ep[f"videos/{video_key}/file_index"]
# Retrieve information from the latest updated video file using latest_episode
latest_ep = self.meta.latest_episode
chunk_idx = latest_ep[f"videos/{video_key}/chunk_index"][0]
file_idx = latest_ep[f"videos/{video_key}/file_index"][0]
latest_path = self.root / self.meta.video_path.format(
video_key=video_key, chunk_index=chunk_idx, file_index=file_idx
)
latest_size_in_mb = get_video_size_in_mb(latest_path)
latest_duration_in_s = get_video_duration_in_s(latest_path)
latest_size_in_mb = get_file_size_in_mb(latest_path)
latest_duration_in_s = latest_ep[f"videos/{video_key}/to_timestamp"][0]
if latest_size_in_mb + ep_size_in_mb >= self.meta.video_files_size_in_mb:
# Move temporary episode video to a new video file in the dataset
@@ -1315,6 +1470,12 @@ class LeRobotDataset(torch.utils.data.Dataset):
obj.delta_timestamps = None
obj.delta_indices = None
obj.video_backend = video_backend if video_backend is not None else get_safe_default_codec()
obj.writer = None
obj.latest_episode = None
# Initialize tracking for incremental recording
obj._lazy_loading = False
obj._recorded_frames = 0
obj._writer_closed_for_reading = False
return obj
+13 -14
View File
@@ -30,7 +30,7 @@ import pandas
import pandas as pd
import pyarrow.parquet as pq
import torch
from datasets import Dataset, concatenate_datasets
from datasets import Dataset
from datasets.table import embed_table_storage
from huggingface_hub import DatasetCard, DatasetCardData, HfApi
from huggingface_hub.errors import RevisionNotFoundError
@@ -44,7 +44,7 @@ from lerobot.datasets.backward_compatibility import (
ForwardCompatibilityError,
)
from lerobot.utils.constants import ACTION, OBS_ENV_STATE, OBS_STR
from lerobot.utils.utils import is_valid_numpy_dtype_string
from lerobot.utils.utils import SuppressProgressBars, is_valid_numpy_dtype_string
DEFAULT_CHUNK_SIZE = 1000 # Max number of files per chunk
DEFAULT_DATA_FILE_SIZE_IN_MB = 100 # Max size per file
@@ -94,12 +94,6 @@ def get_hf_dataset_size_in_mb(hf_ds: Dataset) -> int:
return hf_ds.data.nbytes // (1024**2)
def get_hf_dataset_cache_dir(hf_ds: Dataset) -> Path | None:
if hf_ds.cache_files is None or len(hf_ds.cache_files) == 0:
return None
return Path(hf_ds.cache_files[0]["filename"]).parents[2]
def update_chunk_file_indices(chunk_idx: int, file_idx: int, chunks_size: int) -> tuple[int, int]:
if file_idx == chunks_size - 1:
file_idx = 0
@@ -123,8 +117,9 @@ def load_nested_dataset(pq_dir: Path, features: datasets.Features | None = None)
raise FileNotFoundError(f"Provided directory does not contain any parquet file: {pq_dir}")
# TODO(rcadene): set num_proc to accelerate conversion to pyarrow
datasets = [Dataset.from_parquet(str(path), features=features) for path in paths]
return concatenate_datasets(datasets)
with SuppressProgressBars():
datasets = Dataset.from_parquet([str(path) for path in paths], features=features)
return datasets
def get_parquet_num_frames(parquet_path: str | Path) -> int:
@@ -132,10 +127,14 @@ def get_parquet_num_frames(parquet_path: str | Path) -> int:
return metadata.num_rows
def get_video_size_in_mb(mp4_path: Path) -> float:
file_size_bytes = mp4_path.stat().st_size
file_size_mb = file_size_bytes / (1024**2)
return file_size_mb
def get_file_size_in_mb(file_path: Path) -> float:
"""Get file size on disk in megabytes.
Args:
file_path (Path): Path to the file.
"""
file_size_bytes = file_path.stat().st_size
return file_size_bytes / (1024**2)
def flatten_dict(d: dict, parent_key: str = "", sep: str = "/") -> dict:
+6 -5
View File
@@ -451,11 +451,9 @@ def concatenate_video_files(
stream_map[input_stream.index] = output_container.add_stream_from_template(
template=input_stream, opaque=True
)
stream_map[
input_stream.index
].time_base = (
input_stream.time_base
) # set the time base to the input stream time base (missing in the codec context)
# set the time base to the input stream time base (missing in the codec context)
stream_map[input_stream.index].time_base = input_stream.time_base
# Demux + remux packets (no re-encode)
for packet in input_container.demux():
@@ -644,6 +642,9 @@ class VideoEncodingManager:
)
self.dataset._batch_save_episode_video(start_ep, end_ep)
# Finalize the dataset to properly close all writers
self.dataset.finalize()
# Clean up episode images if recording was interrupted
if exc_type is not None:
interrupted_episode_index = self.dataset.num_episodes
+19 -7
View File
@@ -50,6 +50,8 @@ class AlohaEnv(EnvConfig):
fps: int = 50
episode_length: int = 400
obs_type: str = "pixels_agent_pos"
observation_height: int = 480
observation_width: int = 640
render_mode: str = "rgb_array"
features: dict[str, PolicyFeature] = field(
default_factory=lambda: {
@@ -67,10 +69,14 @@ class AlohaEnv(EnvConfig):
def __post_init__(self):
if self.obs_type == "pixels":
self.features["top"] = PolicyFeature(type=FeatureType.VISUAL, shape=(480, 640, 3))
self.features["top"] = PolicyFeature(
type=FeatureType.VISUAL, shape=(self.observation_height, self.observation_width, 3)
)
elif self.obs_type == "pixels_agent_pos":
self.features["agent_pos"] = PolicyFeature(type=FeatureType.STATE, shape=(14,))
self.features["pixels/top"] = PolicyFeature(type=FeatureType.VISUAL, shape=(480, 640, 3))
self.features["pixels/top"] = PolicyFeature(
type=FeatureType.VISUAL, shape=(self.observation_height, self.observation_width, 3)
)
@property
def gym_kwargs(self) -> dict:
@@ -91,6 +97,8 @@ class PushtEnv(EnvConfig):
render_mode: str = "rgb_array"
visualization_width: int = 384
visualization_height: int = 384
observation_height: int = 384
observation_width: int = 384
features: dict[str, PolicyFeature] = field(
default_factory=lambda: {
ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(2,)),
@@ -108,7 +116,9 @@ class PushtEnv(EnvConfig):
def __post_init__(self):
if self.obs_type == "pixels_agent_pos":
self.features["pixels"] = PolicyFeature(type=FeatureType.VISUAL, shape=(384, 384, 3))
self.features["pixels"] = PolicyFeature(
type=FeatureType.VISUAL, shape=(self.observation_height, self.observation_width, 3)
)
elif self.obs_type == "environment_state_agent_pos":
self.features["environment_state"] = PolicyFeature(type=FeatureType.ENV, shape=(16,))
@@ -255,6 +265,8 @@ class LiberoEnv(EnvConfig):
camera_name: str = "agentview_image,robot0_eye_in_hand_image"
init_states: bool = True
camera_name_mapping: dict[str, str] | None = None
observation_height: int = 360
observation_width: int = 360
features: dict[str, PolicyFeature] = field(
default_factory=lambda: {
ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(7,)),
@@ -272,18 +284,18 @@ class LiberoEnv(EnvConfig):
def __post_init__(self):
if self.obs_type == "pixels":
self.features["pixels/agentview_image"] = PolicyFeature(
type=FeatureType.VISUAL, shape=(360, 360, 3)
type=FeatureType.VISUAL, shape=(self.observation_height, self.observation_width, 3)
)
self.features["pixels/robot0_eye_in_hand_image"] = PolicyFeature(
type=FeatureType.VISUAL, shape=(360, 360, 3)
type=FeatureType.VISUAL, shape=(self.observation_height, self.observation_width, 3)
)
elif self.obs_type == "pixels_agent_pos":
self.features["agent_pos"] = PolicyFeature(type=FeatureType.STATE, shape=(8,))
self.features["pixels/agentview_image"] = PolicyFeature(
type=FeatureType.VISUAL, shape=(360, 360, 3)
type=FeatureType.VISUAL, shape=(self.observation_height, self.observation_width, 3)
)
self.features["pixels/robot0_eye_in_hand_image"] = PolicyFeature(
type=FeatureType.VISUAL, shape=(360, 360, 3)
type=FeatureType.VISUAL, shape=(self.observation_height, self.observation_width, 3)
)
else:
raise ValueError(f"Unsupported obs_type: {self.obs_type}")
+16
View File
@@ -1 +1,17 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .motors_bus import Motor, MotorCalibration, MotorNormMode, MotorsBus
@@ -45,7 +45,7 @@ class DiffusionConfig(PreTrainedConfig):
Args:
n_obs_steps: Number of environment steps worth of observations to pass to the policy (takes the
current step and additional steps going back).
horizon: Diffusion model action prediction size as detailed in `DiffusionPolicy.select_action`.
chunk_size: Diffusion model action prediction size as detailed in `DiffusionPolicy.select_action`.
n_action_steps: The number of action steps to run in the environment for one invocation of the policy.
See `DiffusionPolicy.select_action` for more details.
input_shapes: A dictionary defining the shapes of the input data for the policy. The key represents
@@ -105,7 +105,7 @@ class DiffusionConfig(PreTrainedConfig):
# Inputs / output structure.
n_obs_steps: int = 2
horizon: int = 16
chunk_size: int = 16
n_action_steps: int = 8
normalization_mapping: dict[str, NormalizationMode] = field(
@@ -118,7 +118,7 @@ class DiffusionConfig(PreTrainedConfig):
# The original implementation doesn't sample frames for the last 7 steps,
# which avoids excessive padding and leads to improved training results.
drop_n_last_frames: int = 7 # horizon - n_action_steps - n_obs_steps + 1
drop_n_last_frames: int = 7 # chunk_size - n_action_steps - n_obs_steps + 1
# Architecture / modeling.
# Vision backbone.
@@ -180,13 +180,13 @@ class DiffusionConfig(PreTrainedConfig):
f"Got {self.noise_scheduler_type}."
)
# Check that the horizon size and U-Net downsampling is compatible.
# Check that the chunk size and U-Net downsampling is compatible.
# U-Net downsamples by 2 with each stage.
downsampling_factor = 2 ** len(self.down_dims)
if self.horizon % downsampling_factor != 0:
if self.chunk_size % downsampling_factor != 0:
raise ValueError(
"The horizon should be an integer multiple of the downsampling factor (which is determined "
f"by `len(down_dims)`). Got {self.horizon=} and {self.down_dims=}"
"The chunk_size should be an integer multiple of the downsampling factor (which is determined "
f"by `len(down_dims)`). Got {self.chunk_size=} and {self.down_dims=}"
)
def get_optimizer_preset(self) -> AdamConfig:
@@ -231,7 +231,7 @@ class DiffusionConfig(PreTrainedConfig):
@property
def action_delta_indices(self) -> list:
return list(range(1 - self.n_obs_steps, 1 - self.n_obs_steps + self.horizon))
return list(range(1 - self.n_obs_steps, 1 - self.n_obs_steps + self.chunk_size))
@property
def reward_delta_indices(self) -> None:
@@ -99,25 +99,25 @@ class DiffusionPolicy(PreTrainedPolicy):
return actions
@torch.no_grad()
def select_action(self, batch: dict[str, Tensor], noise: Tensor | None = None) -> Tensor:
def select_action(self, batch: dict[str, Tensor], noise: Tensor | None = None, **kwargs) -> Tensor:
"""Select a single action given environment observations.
This method handles caching a history of observations and an action trajectory generated by the
underlying diffusion model. Here's how it works:
- `n_obs_steps` steps worth of observations are cached (for the first steps, the observation is
copied `n_obs_steps` times to fill the cache).
- The diffusion model generates `horizon` steps worth of actions.
- The diffusion model generates `chunk_size` steps worth of actions.
- `n_action_steps` worth of actions are actually kept for execution, starting from the current step.
Schematically this looks like:
----------------------------------------------------------------------------------------------
(legend: o = n_obs_steps, h = horizon, a = n_action_steps)
(legend: o = n_obs_steps, c = chunk_size, a = n_action_steps)
|timestep | n-o+1 | n-o+2 | ..... | n | ..... | n+a-1 | n+a | ..... | n-o+h |
|observation is used | YES | YES | YES | YES | NO | NO | NO | NO | NO |
|action is generated | YES | YES | YES | YES | YES | YES | YES | YES | YES |
|action is used | NO | NO | NO | YES | YES | YES | NO | NO | NO |
----------------------------------------------------------------------------------------------
Note that this means we require: `n_action_steps <= horizon - n_obs_steps + 1`. Also, note that
"horizon" may not the best name to describe what the variable actually means, because this period is
Note that this means we require: `n_action_steps <= chunk_size - n_obs_steps + 1`. Also, note that
this period is
actually measured from the first observation which (if `n_obs_steps` > 1) happened in the past.
"""
# NOTE: for offline evaluation, we have action in the batch, so we need to pop it out
@@ -213,7 +213,7 @@ class DiffusionModel(nn.Module):
noise
if noise is not None
else torch.randn(
size=(batch_size, self.config.horizon, self.config.action_feature.shape[0]),
size=(batch_size, self.config.chunk_size, self.config.action_feature.shape[0]),
dtype=dtype,
device=device,
generator=generator,
@@ -309,16 +309,16 @@ class DiffusionModel(nn.Module):
AND/OR
"observation.environment_state": (B, n_obs_steps, environment_dim)
"action": (B, horizon, action_dim)
"action_is_pad": (B, horizon)
"action": (B, chunk_size, action_dim)
"action_is_pad": (B, chunk_size)
}
"""
# Input validation.
assert set(batch).issuperset({OBS_STATE, ACTION, "action_is_pad"})
assert OBS_IMAGES in batch or OBS_ENV_STATE in batch
n_obs_steps = batch[OBS_STATE].shape[1]
horizon = batch[ACTION].shape[1]
assert horizon == self.config.horizon
chunk_size = batch[ACTION].shape[1]
assert chunk_size == self.config.chunk_size
assert n_obs_steps == self.config.n_obs_steps
# Encode image features and concatenate them all together along with the state vector.
@@ -0,0 +1,244 @@
# !/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.configs.policies import PreTrainedConfig
from lerobot.configs.types import NormalizationMode
from lerobot.optim.optimizers import MultiAdamConfig
from lerobot.utils.constants import ACTION, OBS_IMAGE, OBS_STATE
def is_image_feature(key: str) -> bool:
"""Check if a feature key represents an image feature.
Args:
key: The feature key to check
Returns:
True if the key represents an image feature, False otherwise
"""
return key.startswith(OBS_IMAGE)
@dataclass
class ConcurrencyConfig:
"""Configuration for the concurrency of the actor and learner.
Possible values are:
- "threads": Use threads for the actor and learner.
- "processes": Use processes for the actor and learner.
"""
actor: str = "threads"
learner: str = "threads"
@dataclass
class ActorLearnerConfig:
learner_host: str = "127.0.0.1"
learner_port: int = 50051
policy_parameters_push_frequency: int = 4
queue_get_timeout: float = 2
@dataclass
class CriticNetworkConfig:
hidden_dims: list[int] = field(default_factory=lambda: [256, 256])
activate_final: bool = True
final_activation: str | None = None
@dataclass
class ActorNetworkConfig:
hidden_dims: list[int] = field(default_factory=lambda: [256, 256])
activate_final: bool = True
use_layer_norm: bool = True
@dataclass
class NoiseActorConfig:
"""Configuration for the noise actor in DSRL.
The noise actor outputs noise that gets fed to the diffusion policy.
"""
use_tanh_squash: bool = False # Whether to bound the noise output
std_min: float = 1e-5
std_max: float = 2.0
init_final: float = 0.05
@PreTrainedConfig.register_subclass("dsrl")
@dataclass
class DSRLConfig(PreTrainedConfig):
"""Diffusion Steering via Reinforcement Learning (DSRL) configuration."""
# Mapping of feature types to normalization modes
normalization_mapping: dict[str, NormalizationMode] = field(
default_factory=lambda: {
"VISUAL": NormalizationMode.MEAN_STD,
"STATE": NormalizationMode.MIN_MAX,
"ENV": NormalizationMode.MIN_MAX,
"ACTION": NormalizationMode.MIN_MAX,
}
)
# Statistics for normalizing different types of inputs
dataset_stats: dict[str, dict[str, list[float]]] | None = field(
default_factory=lambda: {
OBS_IMAGE: {
"mean": [0.485, 0.456, 0.406],
"std": [0.229, 0.224, 0.225],
},
OBS_STATE: {
"min": [0.0, 0.0],
"max": [1.0, 1.0],
},
ACTION: {
"min": [0.0, 0.0, 0.0],
"max": [1.0, 1.0, 1.0],
},
}
)
# Architecture specifics
# Device to run the model on (e.g., "cuda", "cpu")
device: str = "cpu"
# Device to store the model on
storage_device: str = "cpu"
# Name of the vision encoder model (Set to "helper2424/resnet10" for hil serl resnet10)
vision_encoder_name: str | None = None
# Whether to freeze the vision encoder during training
freeze_vision_encoder: bool = True
# Hidden dimension size for the image encoder
image_encoder_hidden_dim: int = 32
# Whether to use a shared encoder for actor and critic
shared_encoder: bool = True
# Number of discrete actions, eg for gripper actions
num_discrete_actions: int | None = None
# Dimension of the image embedding pooling
image_embedding_pooling_dim: int = 8
# Name of the action policy
action_policy_name: str = "pi0"
action_policy_weights: str | None = "lerobot/pi0_base"
# Training parameter
# Number of steps for online training
online_steps: int = 1000000
# Number of steps for offline training
offline_steps: int = 100000
# Capacity of the online replay buffer
online_buffer_capacity: int = 100000
# Capacity of the offline replay buffer
offline_buffer_capacity: int = 100000
# Whether to use asynchronous prefetching for the buffers
async_prefetch: bool = False
# Number of steps before learning starts
online_step_before_learning: int = 100
# Frequency of policy updates
policy_update_freq: int = 1
# SAC algorithm parameters
discount: float = 0.99
# Initial temperature value
temperature_init: float = 1.0
# Number of critics in the ensemble
num_critics: int = 2
# Number of subsampled critics for training
num_subsample_critics: int | None = None
# Learning rate for the critic network
critic_lr: float = 3e-4
# Learning rate for the actor network
actor_lr: float = 3e-4
# Learning rate for the temperature parameter
temperature_lr: float = 3e-4
# Weight for the critic target update
critic_target_update_weight: float = 0.005
# Update-to-data ratio for the UTD algorithm (If you want enable utd_ratio, you need to set it to >1)
utd_ratio: int = 1
# Hidden dimension size for the state encoder
state_encoder_hidden_dim: int = 256
# Dimension of the latent space
latent_dim: int = 256
# Target entropy for the SAC algorithm
target_entropy: float | None = None
# Whether to use backup entropy for the SAC algorithm
use_backup_entropy: bool = True
# Gradient clipping norm for the SAC algorithm
grad_clip_norm: float = 40.0
# Network configuration
# Configuration for the critic network architecture
critic_network_kwargs: CriticNetworkConfig = field(default_factory=CriticNetworkConfig)
# Configuration for the noise critic network architecture
noise_critic_network_kwargs: CriticNetworkConfig = field(default_factory=CriticNetworkConfig)
# Configuration for the noise actor network architecture
noise_actor_network_kwargs: ActorNetworkConfig = field(default_factory=ActorNetworkConfig)
# Configuration for the noise actor specific parameters
noise_actor_kwargs: NoiseActorConfig = field(default_factory=NoiseActorConfig)
# Configuration for actor-learner architecture
actor_learner_config: ActorLearnerConfig = field(default_factory=ActorLearnerConfig)
# Configuration for concurrency settings (you can use threads or processes for the actor and learner)
concurrency: ConcurrencyConfig = field(default_factory=ConcurrencyConfig)
# Optimizations
use_torch_compile: bool = True
def __post_init__(self):
super().__post_init__()
def get_optimizer_preset(self) -> MultiAdamConfig:
return MultiAdamConfig(
weight_decay=0.0,
optimizer_groups={
"critic_action": {"lr": self.critic_lr},
"critic_noise": {"lr": self.critic_lr},
"noise_actor": {"lr": self.actor_lr},
"temperature": {"lr": self.temperature_lr},
},
)
def get_scheduler_preset(self) -> None:
return None
def validate_features(self) -> None:
has_image = any(is_image_feature(key) for key in self.input_features)
has_state = OBS_STATE in self.input_features
if not (has_state or has_image):
raise ValueError(
"You must provide either 'observation.state' or an image observation (key starting with 'observation.image') in the input features"
)
if ACTION not in self.output_features:
raise ValueError("You must provide 'action' in the output features")
@property
def image_features(self) -> list[str]:
return [key for key in self.input_features if is_image_feature(key)]
@property
def observation_delta_indices(self) -> list:
return None
@property
def action_delta_indices(self) -> list:
return None
@property
def reward_delta_indices(self) -> None:
return None
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,89 @@
# !/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Processor for DSRL policy.
DSRL uses a similar processing pipeline as SAC since it operates on
state-action transitions. The main difference is that internally it
also works with noise, but that's handled within the policy itself.
"""
from typing import Any
import torch
from lerobot.policies.dsrl.configuration_dsrl import DSRLConfig
from lerobot.processor import (
AddBatchDimensionProcessorStep,
DeviceProcessorStep,
NormalizerProcessorStep,
PolicyAction,
PolicyProcessorPipeline,
RenameObservationsProcessorStep,
UnnormalizerProcessorStep,
)
from lerobot.processor.converters import (
policy_action_to_transition,
transition_to_policy_action,
)
from lerobot.utils.constants import POLICY_POSTPROCESSOR_DEFAULT_NAME, POLICY_PREPROCESSOR_DEFAULT_NAME
def make_dsrl_pre_post_processors(
config: DSRLConfig,
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
) -> tuple[
PolicyProcessorPipeline[dict, dict],
PolicyProcessorPipeline[PolicyAction, PolicyAction],
]:
"""Create preprocessor and postprocessor pipelines for DSRL policy.
Args:
config: DSRL policy configuration
dataset_stats: Optional dataset statistics for normalization
Returns:
Tuple of (preprocessor, postprocessor) pipelines
"""
input_steps = [
RenameObservationsProcessorStep(rename_map={}),
AddBatchDimensionProcessorStep(),
DeviceProcessorStep(device=config.device),
NormalizerProcessorStep(
features={**config.input_features, **config.output_features},
norm_map=config.normalization_mapping,
stats=dataset_stats,
),
]
output_steps = [
UnnormalizerProcessorStep(
features=config.output_features, norm_map=config.normalization_mapping, stats=dataset_stats
),
DeviceProcessorStep(device="cpu"),
]
return (
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]](
steps=input_steps,
name=POLICY_PREPROCESSOR_DEFAULT_NAME,
),
PolicyProcessorPipeline[PolicyAction, PolicyAction](
steps=output_steps,
name=POLICY_POSTPROCESSOR_DEFAULT_NAME,
to_transition=policy_action_to_transition,
to_output=transition_to_policy_action,
),
)
+16 -2
View File
@@ -30,6 +30,7 @@ from lerobot.envs.configs import EnvConfig
from lerobot.envs.utils import env_to_policy_features
from lerobot.policies.act.configuration_act import ACTConfig
from lerobot.policies.diffusion.configuration_diffusion import DiffusionConfig
from lerobot.policies.dsrl.configuration_dsrl import DSRLConfig
from lerobot.policies.pi0.configuration_pi0 import PI0Config
from lerobot.policies.pi0fast.configuration_pi0fast import PI0FASTConfig
from lerobot.policies.pi05.configuration_pi05 import PI05Config
@@ -58,7 +59,7 @@ def get_policy_class(name: str) -> type[PreTrainedPolicy]:
Args:
name: The name of the policy. Supported names are "tdmpc", "diffusion", "act",
"vqbet", "pi0", "pi0fast", "sac", "reward_classifier", "smolvla".
"vqbet", "pi0", "pi0fast", "sac", "reward_classifier", "smolvla", "dsrl".
Returns:
The policy class corresponding to the given name.
@@ -106,6 +107,10 @@ def get_policy_class(name: str) -> type[PreTrainedPolicy]:
from lerobot.policies.smolvla.modeling_smolvla import SmolVLAPolicy
return SmolVLAPolicy
elif name == "dsrl":
from lerobot.policies.dsrl.modeling_dsrl import DSRLPolicy
return DSRLPolicy
else:
raise NotImplementedError(f"Policy with name {name} is not implemented.")
@@ -120,7 +125,7 @@ def make_policy_config(policy_type: str, **kwargs) -> PreTrainedConfig:
Args:
policy_type: The type of the policy. Supported types include "tdmpc",
"diffusion", "act", "vqbet", "pi0", "pi0fast", "sac", "smolvla",
"reward_classifier".
"reward_classifier", "dsrl".
**kwargs: Keyword arguments to be passed to the configuration class constructor.
Returns:
@@ -149,6 +154,8 @@ def make_policy_config(policy_type: str, **kwargs) -> PreTrainedConfig:
return SmolVLAConfig(**kwargs)
elif policy_type == "reward_classifier":
return RewardClassifierConfig(**kwargs)
elif policy_type == "dsrl":
return DSRLConfig(**kwargs)
else:
raise ValueError(f"Policy type '{policy_type}' is not available.")
@@ -307,6 +314,13 @@ def make_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(policy_cfg, DSRLConfig):
from lerobot.policies.dsrl.processor_dsrl import make_dsrl_pre_post_processors
processors = make_dsrl_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
else:
raise NotImplementedError(f"Processor for policy type '{policy_cfg.type}' is not implemented.")
+2 -2
View File
@@ -1148,7 +1148,7 @@ class PI0Policy(PreTrainedPolicy):
return self._action_queue.popleft()
@torch.no_grad()
def predict_action_chunk(self, batch: dict[str, Tensor]) -> Tensor:
def predict_action_chunk(self, batch: dict[str, Tensor], noise: Tensor | None = None) -> Tensor:
"""Predict a chunk of actions given environment observations."""
self.eval()
@@ -1158,7 +1158,7 @@ class PI0Policy(PreTrainedPolicy):
state = self.prepare_state(batch)
# Sample actions using the model
actions = self.model.sample_actions(images, img_masks, lang_tokens, lang_masks, state)
actions = self.model.sample_actions(images, img_masks, lang_tokens, lang_masks, state, noise)
# Unpad actions to actual action dimension
original_action_dim = self.config.output_features[ACTION].shape[0]
+2 -2
View File
@@ -1120,7 +1120,7 @@ class PI05Policy(PreTrainedPolicy):
return self._action_queue.popleft()
@torch.no_grad()
def predict_action_chunk(self, batch: dict[str, Tensor]) -> Tensor:
def predict_action_chunk(self, batch: dict[str, Tensor], noise: Tensor | None = None) -> Tensor:
"""Predict a chunk of actions given environment observations."""
self.eval()
@@ -1129,7 +1129,7 @@ class PI05Policy(PreTrainedPolicy):
tokens, masks = batch[f"{OBS_LANGUAGE_TOKENS}"], batch[f"{OBS_LANGUAGE_ATTENTION_MASK}"]
# Sample actions using the model (no separate state needed for PI05)
actions = self.model.sample_actions(images, img_masks, tokens, masks)
actions = self.model.sample_actions(images, img_masks, tokens, masks, noise)
# Unpad actions to actual action dimension
original_action_dim = self.config.output_features[ACTION].shape[0]
@@ -1,3 +1,19 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import asdict, dataclass
from typing import Any
+11 -5
View File
@@ -86,7 +86,7 @@ class ReplayBuffer:
image_augmentation_function: Callable | None = None,
use_drq: bool = True,
storage_device: str = "cpu",
optimize_memory: bool = False,
optimize_memory: bool = True
):
"""
Replay buffer for storing transitions.
@@ -136,6 +136,7 @@ class ReplayBuffer:
complementary_info: dict[str, torch.Tensor] | None = None,
):
"""Initialize the storage tensors based on the first transition."""
self.capacity = 1000
# Determine shapes from the first transition
state_shapes = {key: val.squeeze(0).shape for key, val in state.items()}
action_shape = action.squeeze(0).shape
@@ -444,7 +445,7 @@ class ReplayBuffer:
if capacity is None:
capacity = len(lerobot_dataset)
if capacity < len(lerobot_dataset):
if capacity < 1000: #len(lerobot_dataset):
raise ValueError(
"The capacity of the ReplayBuffer must be greater than or equal to the length of the LeRobotDataset."
)
@@ -476,13 +477,14 @@ class ReplayBuffer:
and first_transition["complementary_info"] is not None
):
first_complementary_info = {
k: v.to(device) for k, v in first_transition["complementary_info"].items()
k: v.to for k, v in first_transition["complementary_info"].items()
}
replay_buffer._initialize_storage(
state=first_state, action=first_action, complementary_info=first_complementary_info
)
num_samples = 0
# Fill the buffer with all transitions
for data in list_transition:
for k, v in data.items():
@@ -503,6 +505,9 @@ class ReplayBuffer:
truncated=False, # NOTE: Truncation are not supported yet in lerobot dataset
complementary_info=data.get("complementary_info", None),
)
num_samples += 1
if num_samples >= 1000:
return replay_buffer
return replay_buffer
@@ -607,6 +612,7 @@ class ReplayBuffer:
lerobot_dataset.save_episode()
lerobot_dataset.stop_image_writer()
lerobot_dataset.finalize()
return lerobot_dataset
@@ -644,7 +650,7 @@ class ReplayBuffer:
raise ValueError("State keys must be provided when converting LeRobotDataset to Transitions.")
transitions = []
num_frames = len(dataset)
num_frames = 1000 # len(dataset)
# Check if the dataset has "next.done" key
sample = dataset[0]
@@ -658,7 +664,7 @@ class ReplayBuffer:
if not has_done_key:
print("'next.done' key not found in dataset. Inferring from episode boundaries...")
for i in tqdm(range(num_frames)):
for i in tqdm(range(1000)): # num_frames)):
current_sample = dataset[i]
# ----- 1) Current state -----
+1 -1
View File
@@ -696,7 +696,7 @@ def control_loop(
episode_idx += 1
if dataset is not None:
if transition[TransitionKey.INFO].get("rerecord_episode", False):
if transition[TransitionKey.INFO].get(TeleopEvents.RERECORD_EPISODE, False):
logging.info(f"Re-recording episode {episode_idx}")
dataset.clear_episode_buffer()
episode_idx -= 1
+16
View File
@@ -1,3 +1,19 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .config import RobotConfig
from .robot import Robot
from .utils import make_robot_from_config
+8 -2
View File
@@ -14,13 +14,16 @@
import logging
from pprint import pformat
from typing import cast
from lerobot.robots import RobotConfig
from lerobot.utils.import_utils import make_device_from_device_class
from .config import RobotConfig
from .robot import Robot
def make_robot_from_config(config: RobotConfig) -> Robot:
# TODO(Steven): Consider just using the make_device_from_device_class for all types
if config.type == "koch_follower":
from .koch_follower import KochFollower
@@ -66,7 +69,10 @@ def make_robot_from_config(config: RobotConfig) -> Robot:
return MockRobot(config)
else:
raise ValueError(config.type)
try:
return cast(Robot, make_device_from_device_class(config))
except Exception as e:
raise ValueError(f"Error creating robot with config {config}: {e}") from e
# TODO(pepijn): Move to pipeline step to make sure we don't have to do this in the robot code and send action to robot is clean for use in dataset
+2
View File
@@ -52,6 +52,7 @@ from lerobot.teleoperators import ( # noqa: F401
so100_leader,
so101_leader,
)
from lerobot.utils.import_utils import register_third_party_devices
from lerobot.utils.utils import init_logging
@@ -83,6 +84,7 @@ def calibrate(cfg: CalibrateConfig):
def main():
register_third_party_devices()
calibrate()
+286
View File
@@ -0,0 +1,286 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Edit LeRobot datasets using various transformation tools.
This script allows you to delete episodes, split datasets, merge datasets,
and remove features. When new_repo_id is specified, creates a new dataset.
Usage Examples:
Delete episodes 0, 2, and 5 from a dataset:
python -m lerobot.scripts.lerobot_edit_dataset \
--repo_id lerobot/pusht \
--operation.type delete_episodes \
--operation.episode_indices "[0, 2, 5]"
Delete episodes and save to a new dataset:
python -m lerobot.scripts.lerobot_edit_dataset \
--repo_id lerobot/pusht \
--new_repo_id lerobot/pusht_filtered \
--operation.type delete_episodes \
--operation.episode_indices "[0, 2, 5]"
Split dataset by fractions:
python -m lerobot.scripts.lerobot_edit_dataset \
--repo_id lerobot/pusht \
--operation.type split \
--operation.splits '{"train": 0.8, "val": 0.2}'
Split dataset by episode indices:
python -m lerobot.scripts.lerobot_edit_dataset \
--repo_id lerobot/pusht \
--operation.type split \
--operation.splits '{"train": [0, 1, 2, 3], "val": [4, 5]}'
Split into more than two splits:
python -m lerobot.scripts.lerobot_edit_dataset \
--repo_id lerobot/pusht \
--operation.type split \
--operation.splits '{"train": 0.6, "val": 0.2, "test": 0.2}'
Merge multiple datasets:
python -m lerobot.scripts.lerobot_edit_dataset \
--repo_id lerobot/pusht_merged \
--operation.type merge \
--operation.repo_ids "['lerobot/pusht_train', 'lerobot/pusht_val']"
Remove camera feature:
python -m lerobot.scripts.lerobot_edit_dataset \
--repo_id lerobot/pusht \
--operation.type remove_feature \
--operation.feature_names "['observation.images.top']"
Using JSON config file:
python -m lerobot.scripts.lerobot_edit_dataset \
--config_path path/to/edit_config.json
"""
import logging
import shutil
from dataclasses import dataclass
from pathlib import Path
from lerobot.configs import parser
from lerobot.datasets.dataset_tools import (
delete_episodes,
merge_datasets,
remove_feature,
split_dataset,
)
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.utils.constants import HF_LEROBOT_HOME
from lerobot.utils.utils import init_logging
@dataclass
class DeleteEpisodesConfig:
type: str = "delete_episodes"
episode_indices: list[int] | None = None
@dataclass
class SplitConfig:
type: str = "split"
splits: dict[str, float | list[int]] | None = None
@dataclass
class MergeConfig:
type: str = "merge"
repo_ids: list[str] | None = None
@dataclass
class RemoveFeatureConfig:
type: str = "remove_feature"
feature_names: list[str] | None = None
@dataclass
class EditDatasetConfig:
repo_id: str
operation: DeleteEpisodesConfig | SplitConfig | MergeConfig | RemoveFeatureConfig
root: str | None = None
new_repo_id: str | None = None
push_to_hub: bool = False
def get_output_path(repo_id: str, new_repo_id: str | None, root: Path | None) -> tuple[str, Path]:
if new_repo_id:
output_repo_id = new_repo_id
output_dir = root / new_repo_id if root else HF_LEROBOT_HOME / new_repo_id
else:
output_repo_id = repo_id
dataset_path = root / repo_id if root else HF_LEROBOT_HOME / repo_id
old_path = Path(str(dataset_path) + "_old")
if dataset_path.exists():
if old_path.exists():
shutil.rmtree(old_path)
shutil.move(str(dataset_path), str(old_path))
output_dir = dataset_path
return output_repo_id, output_dir
def handle_delete_episodes(cfg: EditDatasetConfig) -> None:
if not isinstance(cfg.operation, DeleteEpisodesConfig):
raise ValueError("Operation config must be DeleteEpisodesConfig")
if not cfg.operation.episode_indices:
raise ValueError("episode_indices must be specified for delete_episodes operation")
dataset = LeRobotDataset(cfg.repo_id, root=cfg.root)
output_repo_id, output_dir = get_output_path(
cfg.repo_id, cfg.new_repo_id, Path(cfg.root) if cfg.root else None
)
if cfg.new_repo_id is None:
dataset.root = Path(str(dataset.root) + "_old")
logging.info(f"Deleting episodes {cfg.operation.episode_indices} from {cfg.repo_id}")
new_dataset = delete_episodes(
dataset,
episode_indices=cfg.operation.episode_indices,
output_dir=output_dir,
repo_id=output_repo_id,
)
logging.info(f"Dataset saved to {output_dir}")
logging.info(f"Episodes: {new_dataset.meta.total_episodes}, Frames: {new_dataset.meta.total_frames}")
if cfg.push_to_hub:
logging.info(f"Pushing to hub as {output_repo_id}")
LeRobotDataset(output_repo_id, root=output_dir).push_to_hub()
def handle_split(cfg: EditDatasetConfig) -> None:
if not isinstance(cfg.operation, SplitConfig):
raise ValueError("Operation config must be SplitConfig")
if not cfg.operation.splits:
raise ValueError(
"splits dict must be specified with split names as keys and fractions/episode lists as values"
)
dataset = LeRobotDataset(cfg.repo_id, root=cfg.root)
logging.info(f"Splitting dataset {cfg.repo_id} with splits: {cfg.operation.splits}")
split_datasets = split_dataset(dataset, splits=cfg.operation.splits)
for split_name, split_ds in split_datasets.items():
split_repo_id = f"{cfg.repo_id}_{split_name}"
logging.info(
f"{split_name}: {split_ds.meta.total_episodes} episodes, {split_ds.meta.total_frames} frames"
)
if cfg.push_to_hub:
logging.info(f"Pushing {split_name} split to hub as {split_repo_id}")
LeRobotDataset(split_ds.repo_id, root=split_ds.root).push_to_hub()
def handle_merge(cfg: EditDatasetConfig) -> None:
if not isinstance(cfg.operation, MergeConfig):
raise ValueError("Operation config must be MergeConfig")
if not cfg.operation.repo_ids:
raise ValueError("repo_ids must be specified for merge operation")
if not cfg.repo_id:
raise ValueError("repo_id must be specified as the output repository for merged dataset")
logging.info(f"Loading {len(cfg.operation.repo_ids)} datasets to merge")
datasets = [LeRobotDataset(repo_id, root=cfg.root) for repo_id in cfg.operation.repo_ids]
output_dir = Path(cfg.root) / cfg.repo_id if cfg.root else HF_LEROBOT_HOME / cfg.repo_id
logging.info(f"Merging datasets into {cfg.repo_id}")
merged_dataset = merge_datasets(
datasets,
output_repo_id=cfg.repo_id,
output_dir=output_dir,
)
logging.info(f"Merged dataset saved to {output_dir}")
logging.info(
f"Episodes: {merged_dataset.meta.total_episodes}, Frames: {merged_dataset.meta.total_frames}"
)
if cfg.push_to_hub:
logging.info(f"Pushing to hub as {cfg.repo_id}")
LeRobotDataset(merged_dataset.repo_id, root=output_dir).push_to_hub()
def handle_remove_feature(cfg: EditDatasetConfig) -> None:
if not isinstance(cfg.operation, RemoveFeatureConfig):
raise ValueError("Operation config must be RemoveFeatureConfig")
if not cfg.operation.feature_names:
raise ValueError("feature_names must be specified for remove_feature operation")
dataset = LeRobotDataset(cfg.repo_id, root=cfg.root)
output_repo_id, output_dir = get_output_path(
cfg.repo_id, cfg.new_repo_id, Path(cfg.root) if cfg.root else None
)
if cfg.new_repo_id is None:
dataset.root = Path(str(dataset.root) + "_old")
logging.info(f"Removing features {cfg.operation.feature_names} from {cfg.repo_id}")
new_dataset = remove_feature(
dataset,
feature_names=cfg.operation.feature_names,
output_dir=output_dir,
repo_id=output_repo_id,
)
logging.info(f"Dataset saved to {output_dir}")
logging.info(f"Remaining features: {list(new_dataset.meta.features.keys())}")
if cfg.push_to_hub:
logging.info(f"Pushing to hub as {output_repo_id}")
LeRobotDataset(output_repo_id, root=output_dir).push_to_hub()
@parser.wrap()
def edit_dataset(cfg: EditDatasetConfig) -> None:
operation_type = cfg.operation.type
if operation_type == "delete_episodes":
handle_delete_episodes(cfg)
elif operation_type == "split":
handle_split(cfg)
elif operation_type == "merge":
handle_merge(cfg)
elif operation_type == "remove_feature":
handle_remove_feature(cfg)
else:
raise ValueError(
f"Unknown operation type: {operation_type}\n"
f"Available operations: delete_episodes, split, merge, remove_feature"
)
def main() -> None:
init_logging()
edit_dataset()
if __name__ == "__main__":
main()
+2
View File
@@ -117,6 +117,7 @@ from lerobot.utils.control_utils import (
sanity_check_dataset_name,
sanity_check_dataset_robot_compatibility,
)
from lerobot.utils.import_utils import register_third_party_devices
from lerobot.utils.robot_utils import busy_wait
from lerobot.utils.utils import (
get_safe_torch_device,
@@ -513,6 +514,7 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
def main():
register_third_party_devices()
record()
+2
View File
@@ -61,6 +61,7 @@ from lerobot.robots import ( # noqa: F401
so101_follower,
)
from lerobot.utils.constants import ACTION
from lerobot.utils.import_utils import register_third_party_devices
from lerobot.utils.robot_utils import busy_wait
from lerobot.utils.utils import (
init_logging,
@@ -126,6 +127,7 @@ def replay(cfg: ReplayConfig):
def main():
register_third_party_devices()
replay()
@@ -88,6 +88,7 @@ from lerobot.teleoperators import ( # noqa: F401
so100_leader,
so101_leader,
)
from lerobot.utils.import_utils import register_third_party_devices
from lerobot.utils.robot_utils import busy_wait
from lerobot.utils.utils import init_logging, move_cursor_up
from lerobot.utils.visualization_utils import init_rerun, log_rerun_data
@@ -215,6 +216,7 @@ def teleoperate(cfg: TeleoperateConfig):
def main():
register_third_party_devices()
teleoperate()
@@ -102,21 +102,12 @@ class KeyboardTeleop(Teleoperator):
pass
def _on_press(self, key):
# Capture both character keys and special keys (arrows, shift, ctrl, etc.)
if hasattr(key, "char"):
self.event_queue.put((key.char, True))
else:
# Capture special keys directly (keyboard.Key enum values)
self.event_queue.put((key, True))
def _on_release(self, key):
# Capture both character keys and special keys
if hasattr(key, "char"):
self.event_queue.put((key.char, False))
else:
# Capture special keys directly (keyboard.Key enum values)
self.event_queue.put((key, False))
if key == keyboard.Key.esc:
logging.info("ESC pressed, disconnecting.")
self.disconnect()
@@ -223,6 +214,8 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
# this is useful for retrieving other events like interventions for RL, episode success, etc.
self.misc_keys_queue.put(key)
self.current_pressed.clear()
action_dict = {
"delta_x": delta_x,
"delta_y": delta_y,
@@ -290,8 +283,6 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
terminate_episode = True
success = False
self.current_pressed.clear()
return {
TeleopEvents.IS_INTERVENTION: is_intervention,
TeleopEvents.TERMINATE_EPISODE: terminate_episode,
+8 -1
View File
@@ -13,6 +13,9 @@
# limitations under the License.
from enum import Enum
from typing import cast
from lerobot.utils.import_utils import make_device_from_device_class
from .config import TeleoperatorConfig
from .teleoperator import Teleoperator
@@ -29,6 +32,7 @@ class TeleopEvents(Enum):
def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator:
# TODO(Steven): Consider just using the make_device_from_device_class for all types
if config.type == "keyboard":
from .keyboard import KeyboardTeleop
@@ -82,4 +86,7 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator:
return Reachy2Teleoperator(config)
else:
raise ValueError(config.type)
try:
return cast(Teleoperator, make_device_from_device_class(config))
except Exception as e:
raise ValueError(f"Error creating robot with config {config}: {e}") from e
+94
View File
@@ -15,6 +15,10 @@
# limitations under the License.
import importlib
import logging
import pkgutil
from typing import Any
from draccus.choice_types import ChoiceRegistry
def is_package_available(pkg_name: str, return_version: bool = False) -> tuple[bool, str] | bool:
@@ -58,3 +62,93 @@ def is_package_available(pkg_name: str, return_version: bool = False) -> tuple[b
_transformers_available = is_package_available("transformers")
def make_device_from_device_class(config: ChoiceRegistry) -> Any:
"""
Dynamically instantiates an object from its `ChoiceRegistry` configuration.
This factory uses the module path and class name from the `config` object's
type to locate and instantiate the corresponding device class (not the config).
It derives the device class name by removing a trailing 'Config' from the config
class name and tries a few candidate modules where the device implementation is
commonly located.
"""
if not isinstance(config, ChoiceRegistry):
raise ValueError(f"Config should be an instance of `ChoiceRegistry`, got {type(config)}")
config_cls = config.__class__
module_path = config_cls.__module__ # typical: lerobot_teleop_mydevice.config_mydevice
config_name = config_cls.__name__ # typical: MyDeviceConfig
# Derive device class name (strip "Config")
if not config_name.endswith("Config"):
raise ValueError(f"Config class name '{config_name}' does not end with 'Config'")
device_class_name = config_name[:-6] # typical: MyDeviceConfig -> MyDevice
# Build candidate modules to search for the device class
parts = module_path.split(".")
parent_module = ".".join(parts[:-1]) if len(parts) > 1 else module_path
candidates = [
parent_module, # typical: lerobot_teleop_mydevice
parent_module + "." + device_class_name.lower(), # typical: lerobot_teleop_mydevice.mydevice
]
# handle modules named like "config_xxx" -> try replacing that piece with "xxx"
last = parts[-1] if parts else ""
if last.startswith("config_"):
candidates.append(".".join(parts[:-1] + [last.replace("config_", "")]))
# de-duplicate while preserving order
seen: set[str] = set()
candidates = [c for c in candidates if not (c in seen or seen.add(c))]
tried: list[str] = []
for candidate in candidates:
tried.append(candidate)
try:
module = importlib.import_module(candidate)
except ImportError:
continue
if hasattr(module, device_class_name):
cls = getattr(module, device_class_name)
if callable(cls):
try:
return cls(config)
except TypeError as e:
raise TypeError(
f"Failed to instantiate '{device_class_name}' from module '{candidate}': {e}"
) from e
raise ImportError(
f"Could not locate device class '{device_class_name}' for config '{config_name}'. "
f"Tried modules: {tried}. Ensure your device class name is the config class name without "
f"'Config' and that it's importable from one of those modules."
)
def register_third_party_devices() -> None:
"""
Discover and import third-party lerobot_* plugins so they can register themselves.
Scans top-level modules on sys.path for packages starting with
'lerobot_robot_', 'lerobot_camera_' or 'lerobot_teleoperator_' and imports them.
"""
prefixes = ("lerobot_robot_", "lerobot_camera_", "lerobot_teleoperator_")
imported: list[str] = []
failed: list[str] = []
for module_info in pkgutil.iter_modules():
name = module_info.name
if name.startswith(prefixes):
try:
importlib.import_module(name)
imported.append(name)
logging.info("Imported third-party plugin: %s", name)
except Exception:
logging.exception("Could not import third-party plugin: %s", name)
failed.append(name)
logging.debug("Third-party plugin import summary: imported=%s failed=%s", imported, failed)
+20
View File
@@ -27,6 +27,7 @@ from statistics import mean
import numpy as np
import torch
from datasets.utils.logging import disable_progress_bar, enable_progress_bar
def inside_slurm():
@@ -247,6 +248,25 @@ def get_elapsed_time_in_days_hours_minutes_seconds(elapsed_time_s: float):
return days, hours, minutes, seconds
class SuppressProgressBars:
"""
Context manager to suppress progress bars.
Example
--------
```python
with SuppressProgressBars():
# Code that would normally show progress bars
```
"""
def __enter__(self):
disable_progress_bar()
def __exit__(self, exc_type, exc_val, exc_tb):
enable_progress_bar()
class TimerManager:
"""
Lightweight utility to measure elapsed time.
+3 -1
View File
@@ -91,6 +91,9 @@ def test_async_inference_e2e(monkeypatch):
policy_server.policy = MockPolicy()
policy_server.actions_per_chunk = 20
policy_server.device = "cpu"
# NOTE(Steven): Smelly tests as the Server is a state machine being partially mocked. Adding these processors as a quick fix.
policy_server.preprocessor = lambda obs: obs
policy_server.postprocessor = lambda tensor: tensor
# Set up robot config and features
robot_config = MockRobotConfig()
@@ -136,7 +139,6 @@ def test_async_inference_e2e(monkeypatch):
policy_type="test",
pretrained_name_or_path="test",
actions_per_chunk=20,
verify_robot_cameras=False,
)
client = RobotClient(client_config)
+9 -19
View File
@@ -333,9 +333,8 @@ def test_raw_observation_to_observation_basic():
robot_obs = _create_mock_robot_observation()
lerobot_features = _create_mock_lerobot_features()
policy_image_features = _create_mock_policy_image_features()
device = "cpu"
observation = raw_observation_to_observation(robot_obs, lerobot_features, policy_image_features, device)
observation = raw_observation_to_observation(robot_obs, lerobot_features, policy_image_features)
# Check that all expected keys are present
assert OBS_STATE in observation
@@ -345,7 +344,6 @@ def test_raw_observation_to_observation_basic():
# Check state processing
state = observation[OBS_STATE]
assert isinstance(state, torch.Tensor)
assert state.device.type == device
assert state.shape == (1, 4) # Batched
# Check image processing
@@ -356,10 +354,6 @@ def test_raw_observation_to_observation_basic():
assert laptop_img.shape == (1, 3, 224, 224)
assert phone_img.shape == (1, 3, 160, 160)
# Check device placement
assert laptop_img.device.type == device
assert phone_img.device.type == device
# Check image dtype and range (should be float32 in [0, 1])
assert laptop_img.dtype == torch.float32
assert phone_img.dtype == torch.float32
@@ -374,9 +368,8 @@ def test_raw_observation_to_observation_with_non_tensor_data():
lerobot_features = _create_mock_lerobot_features()
policy_image_features = _create_mock_policy_image_features()
device = "cpu"
observation = raw_observation_to_observation(robot_obs, lerobot_features, policy_image_features, device)
observation = raw_observation_to_observation(robot_obs, lerobot_features, policy_image_features)
# Check that task string is preserved
assert "task" in observation
@@ -386,19 +379,17 @@ def test_raw_observation_to_observation_with_non_tensor_data():
@torch.no_grad()
def test_raw_observation_to_observation_device_handling():
"""Test that tensors are properly moved to the specified device."""
device = "mps" if torch.backends.mps.is_available() else "cpu"
"""Test that tensors are created (device placement is handled by preprocessor)."""
robot_obs = _create_mock_robot_observation()
lerobot_features = _create_mock_lerobot_features()
policy_image_features = _create_mock_policy_image_features()
observation = raw_observation_to_observation(robot_obs, lerobot_features, policy_image_features, device)
observation = raw_observation_to_observation(robot_obs, lerobot_features, policy_image_features)
# Check that all tensors are on the correct device
# Check that all expected keys produce tensors (device placement handled by preprocessor later)
for key, value in observation.items():
if isinstance(value, torch.Tensor):
assert value.device.type == device, f"Tensor {key} not on {device}"
assert value.device.type in ["cpu", "cuda", "mps"], f"Tensor {key} on unexpected device"
def test_raw_observation_to_observation_deterministic():
@@ -406,11 +397,10 @@ def test_raw_observation_to_observation_deterministic():
robot_obs = _create_mock_robot_observation()
lerobot_features = _create_mock_lerobot_features()
policy_image_features = _create_mock_policy_image_features()
device = "cpu"
# Run twice with same input
obs1 = raw_observation_to_observation(robot_obs, lerobot_features, policy_image_features, device)
obs2 = raw_observation_to_observation(robot_obs, lerobot_features, policy_image_features, device)
obs1 = raw_observation_to_observation(robot_obs, lerobot_features, policy_image_features)
obs2 = raw_observation_to_observation(robot_obs, lerobot_features, policy_image_features)
# Results should be identical
assert set(obs1.keys()) == set(obs2.keys())
@@ -448,7 +438,7 @@ def test_image_processing_pipeline_preserves_content():
)
}
observation = raw_observation_to_observation(robot_obs, lerobot_features, policy_image_features, "cpu")
observation = raw_observation_to_observation(robot_obs, lerobot_features, policy_image_features)
processed_img = observation[f"{OBS_IMAGES}.laptop"].squeeze(0) # Remove batch dim
@@ -196,6 +196,9 @@ def test_predict_action_chunk(monkeypatch, policy_server):
# Force server to act-style policy; patch method to return deterministic tensor
policy_server.policy_type = "act"
# NOTE(Steven): Smelly tests as the Server is a state machine being partially mocked. Adding these processors as a quick fix.
policy_server.preprocessor = lambda obs: obs
policy_server.postprocessor = lambda tensor: tensor
action_dim = 6
batch_size = 1
actions_per_chunk = policy_server.actions_per_chunk
@@ -51,7 +51,6 @@ def robot_client():
policy_type="test",
pretrained_name_or_path="test",
actions_per_chunk=20,
verify_robot_cameras=False,
)
client = RobotClient(test_config)
+90
View File
@@ -181,6 +181,54 @@ def assert_dataset_iteration_works(aggr_ds):
pass
def assert_video_timestamps_within_bounds(aggr_ds):
"""Test that all video timestamps are within valid bounds for their respective video files.
This catches bugs where timestamps point to frames beyond the actual video length,
which would cause "Invalid frame index" errors during data loading.
"""
try:
from torchcodec.decoders import VideoDecoder
except ImportError:
return
for ep_idx in range(aggr_ds.num_episodes):
ep = aggr_ds.meta.episodes[ep_idx]
for vid_key in aggr_ds.meta.video_keys:
from_ts = ep[f"videos/{vid_key}/from_timestamp"]
to_ts = ep[f"videos/{vid_key}/to_timestamp"]
video_path = aggr_ds.root / aggr_ds.meta.get_video_file_path(ep_idx, vid_key)
if not video_path.exists():
continue
from_frame_idx = round(from_ts * aggr_ds.fps)
to_frame_idx = round(to_ts * aggr_ds.fps)
try:
decoder = VideoDecoder(str(video_path))
num_frames = len(decoder)
# Verify timestamps don't exceed video bounds
assert from_frame_idx >= 0, (
f"Episode {ep_idx}, {vid_key}: from_frame_idx ({from_frame_idx}) < 0"
)
assert from_frame_idx < num_frames, (
f"Episode {ep_idx}, {vid_key}: from_frame_idx ({from_frame_idx}) >= video frames ({num_frames})"
)
assert to_frame_idx <= num_frames, (
f"Episode {ep_idx}, {vid_key}: to_frame_idx ({to_frame_idx}) > video frames ({num_frames})"
)
assert from_frame_idx < to_frame_idx, (
f"Episode {ep_idx}, {vid_key}: from_frame_idx ({from_frame_idx}) >= to_frame_idx ({to_frame_idx})"
)
except Exception as e:
raise AssertionError(
f"Failed to verify timestamps for episode {ep_idx}, {vid_key}: {e}"
) from e
def test_aggregate_datasets(tmp_path, lerobot_dataset_factory):
"""Test basic aggregation functionality with standard parameters."""
ds_0_num_frames = 400
@@ -227,6 +275,7 @@ def test_aggregate_datasets(tmp_path, lerobot_dataset_factory):
assert_metadata_consistency(aggr_ds, ds_0, ds_1)
assert_episode_indices_updated_correctly(aggr_ds, ds_0, ds_1)
assert_video_frames_integrity(aggr_ds, ds_0, ds_1)
assert_video_timestamps_within_bounds(aggr_ds)
assert_dataset_iteration_works(aggr_ds)
@@ -277,6 +326,7 @@ def test_aggregate_with_low_threshold(tmp_path, lerobot_dataset_factory):
assert_metadata_consistency(aggr_ds, ds_0, ds_1)
assert_episode_indices_updated_correctly(aggr_ds, ds_0, ds_1)
assert_video_frames_integrity(aggr_ds, ds_0, ds_1)
assert_video_timestamps_within_bounds(aggr_ds)
assert_dataset_iteration_works(aggr_ds)
# Check that multiple files were actually created due to small size limits
@@ -290,3 +340,43 @@ def test_aggregate_with_low_threshold(tmp_path, lerobot_dataset_factory):
if video_dir.exists():
video_files = list(video_dir.rglob("*.mp4"))
assert len(video_files) > 1, "Small file size limits should create multiple video files"
def test_video_timestamps_regression(tmp_path, lerobot_dataset_factory):
"""Regression test for video timestamp bug when merging datasets.
This test specifically checks that video timestamps are correctly calculated
and accumulated when merging multiple datasets.
"""
datasets = []
for i in range(3):
ds = lerobot_dataset_factory(
root=tmp_path / f"regression_{i}",
repo_id=f"{DUMMY_REPO_ID}_regression_{i}",
total_episodes=2,
total_frames=100,
)
datasets.append(ds)
aggregate_datasets(
repo_ids=[ds.repo_id for ds in datasets],
roots=[ds.root for ds in datasets],
aggr_repo_id=f"{DUMMY_REPO_ID}_regression_aggr",
aggr_root=tmp_path / "regression_aggr",
)
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(tmp_path / "regression_aggr")
aggr_ds = LeRobotDataset(f"{DUMMY_REPO_ID}_regression_aggr", root=tmp_path / "regression_aggr")
assert_video_timestamps_within_bounds(aggr_ds)
for i in range(len(aggr_ds)):
item = aggr_ds[i]
for key in aggr_ds.meta.video_keys:
assert key in item, f"Video key {key} missing from item {i}"
assert item[key].shape[0] == 3, f"Expected 3 channels for video key {key}"
+895
View File
@@ -0,0 +1,895 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for dataset tools utilities."""
from unittest.mock import patch
import numpy as np
import pytest
import torch
from lerobot.datasets.dataset_tools import (
add_feature,
delete_episodes,
merge_datasets,
remove_feature,
split_dataset,
)
@pytest.fixture
def sample_dataset(tmp_path, empty_lerobot_dataset_factory):
"""Create a sample dataset for testing."""
features = {
"action": {"dtype": "float32", "shape": (6,), "names": None},
"observation.state": {"dtype": "float32", "shape": (4,), "names": None},
"observation.images.top": {"dtype": "image", "shape": (224, 224, 3), "names": None},
}
dataset = empty_lerobot_dataset_factory(
root=tmp_path / "test_dataset",
features=features,
)
for ep_idx in range(5):
for _ in range(10):
frame = {
"action": np.random.randn(6).astype(np.float32),
"observation.state": np.random.randn(4).astype(np.float32),
"observation.images.top": np.random.randint(0, 255, size=(224, 224, 3), dtype=np.uint8),
"task": f"task_{ep_idx % 2}",
}
dataset.add_frame(frame)
dataset.save_episode()
dataset.finalize()
return dataset
def test_delete_single_episode(sample_dataset, tmp_path):
"""Test deleting a single episode."""
output_dir = tmp_path / "filtered"
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(output_dir)
new_dataset = delete_episodes(
sample_dataset,
episode_indices=[2],
output_dir=output_dir,
)
assert new_dataset.meta.total_episodes == 4
assert new_dataset.meta.total_frames == 40
episode_indices = {int(idx.item()) for idx in new_dataset.hf_dataset["episode_index"]}
assert episode_indices == {0, 1, 2, 3}
assert len(new_dataset) == 40
def test_delete_multiple_episodes(sample_dataset, tmp_path):
"""Test deleting multiple episodes."""
output_dir = tmp_path / "filtered"
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(output_dir)
new_dataset = delete_episodes(
sample_dataset,
episode_indices=[1, 3],
output_dir=output_dir,
)
assert new_dataset.meta.total_episodes == 3
assert new_dataset.meta.total_frames == 30
episode_indices = {int(idx.item()) for idx in new_dataset.hf_dataset["episode_index"]}
assert episode_indices == {0, 1, 2}
def test_delete_invalid_episodes(sample_dataset, tmp_path):
"""Test error handling for invalid episode indices."""
with pytest.raises(ValueError, match="Invalid episode indices"):
delete_episodes(
sample_dataset,
episode_indices=[10, 20],
output_dir=tmp_path / "filtered",
)
def test_delete_all_episodes(sample_dataset, tmp_path):
"""Test error when trying to delete all episodes."""
with pytest.raises(ValueError, match="Cannot delete all episodes"):
delete_episodes(
sample_dataset,
episode_indices=list(range(5)),
output_dir=tmp_path / "filtered",
)
def test_delete_empty_list(sample_dataset, tmp_path):
"""Test error when no episodes specified."""
with pytest.raises(ValueError, match="No episodes to delete"):
delete_episodes(
sample_dataset,
episode_indices=[],
output_dir=tmp_path / "filtered",
)
def test_split_by_episodes(sample_dataset, tmp_path):
"""Test splitting dataset by specific episode indices."""
splits = {
"train": [0, 1, 2],
"val": [3, 4],
}
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
def mock_snapshot(repo_id, **kwargs):
if "train" in repo_id:
return str(tmp_path / f"{sample_dataset.repo_id}_train")
elif "val" in repo_id:
return str(tmp_path / f"{sample_dataset.repo_id}_val")
return str(kwargs.get("local_dir", tmp_path))
mock_snapshot_download.side_effect = mock_snapshot
result = split_dataset(
sample_dataset,
splits=splits,
output_dir=tmp_path,
)
assert set(result.keys()) == {"train", "val"}
assert result["train"].meta.total_episodes == 3
assert result["train"].meta.total_frames == 30
assert result["val"].meta.total_episodes == 2
assert result["val"].meta.total_frames == 20
train_episodes = {int(idx.item()) for idx in result["train"].hf_dataset["episode_index"]}
assert train_episodes == {0, 1, 2}
val_episodes = {int(idx.item()) for idx in result["val"].hf_dataset["episode_index"]}
assert val_episodes == {0, 1}
def test_split_by_fractions(sample_dataset, tmp_path):
"""Test splitting dataset by fractions."""
splits = {
"train": 0.6,
"val": 0.4,
}
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
def mock_snapshot(repo_id, **kwargs):
for split_name in splits:
if split_name in repo_id:
return str(tmp_path / f"{sample_dataset.repo_id}_{split_name}")
return str(kwargs.get("local_dir", tmp_path))
mock_snapshot_download.side_effect = mock_snapshot
result = split_dataset(
sample_dataset,
splits=splits,
output_dir=tmp_path,
)
assert result["train"].meta.total_episodes == 3
assert result["val"].meta.total_episodes == 2
def test_split_overlapping_episodes(sample_dataset, tmp_path):
"""Test error when episodes appear in multiple splits."""
splits = {
"train": [0, 1, 2],
"val": [2, 3, 4],
}
with pytest.raises(ValueError, match="Episodes cannot appear in multiple splits"):
split_dataset(sample_dataset, splits=splits, output_dir=tmp_path)
def test_split_invalid_fractions(sample_dataset, tmp_path):
"""Test error when fractions sum to more than 1."""
splits = {
"train": 0.7,
"val": 0.5,
}
with pytest.raises(ValueError, match="Split fractions must sum to <= 1.0"):
split_dataset(sample_dataset, splits=splits, output_dir=tmp_path)
def test_split_empty(sample_dataset, tmp_path):
"""Test error with empty splits."""
with pytest.raises(ValueError, match="No splits provided"):
split_dataset(sample_dataset, splits={}, output_dir=tmp_path)
def test_merge_two_datasets(sample_dataset, tmp_path, empty_lerobot_dataset_factory):
"""Test merging two datasets."""
features = {
"action": {"dtype": "float32", "shape": (6,), "names": None},
"observation.state": {"dtype": "float32", "shape": (4,), "names": None},
"observation.images.top": {"dtype": "image", "shape": (224, 224, 3), "names": None},
}
dataset2 = empty_lerobot_dataset_factory(
root=tmp_path / "test_dataset2",
features=features,
)
for ep_idx in range(3):
for _ in range(10):
frame = {
"action": np.random.randn(6).astype(np.float32),
"observation.state": np.random.randn(4).astype(np.float32),
"observation.images.top": np.random.randint(0, 255, size=(224, 224, 3), dtype=np.uint8),
"task": f"task_{ep_idx % 2}",
}
dataset2.add_frame(frame)
dataset2.save_episode()
dataset2.finalize()
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(tmp_path / "merged_dataset")
merged = merge_datasets(
[sample_dataset, dataset2],
output_repo_id="merged_dataset",
output_dir=tmp_path / "merged_dataset",
)
assert merged.meta.total_episodes == 8 # 5 + 3
assert merged.meta.total_frames == 80 # 50 + 30
episode_indices = sorted({int(idx.item()) for idx in merged.hf_dataset["episode_index"]})
assert episode_indices == list(range(8))
def test_merge_empty_list(tmp_path):
"""Test error when merging empty list."""
with pytest.raises(ValueError, match="No datasets to merge"):
merge_datasets([], output_repo_id="merged", output_dir=tmp_path)
def test_add_feature_with_values(sample_dataset, tmp_path):
"""Test adding a feature with pre-computed values."""
num_frames = sample_dataset.meta.total_frames
reward_values = np.random.randn(num_frames, 1).astype(np.float32)
feature_info = {
"dtype": "float32",
"shape": (1,),
"names": None,
}
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(tmp_path / "with_reward")
new_dataset = add_feature(
sample_dataset,
feature_name="reward",
feature_values=reward_values,
feature_info=feature_info,
output_dir=tmp_path / "with_reward",
)
assert "reward" in new_dataset.meta.features
assert new_dataset.meta.features["reward"] == feature_info
assert len(new_dataset) == num_frames
sample_item = new_dataset[0]
assert "reward" in sample_item
assert isinstance(sample_item["reward"], torch.Tensor)
def test_add_feature_with_callable(sample_dataset, tmp_path):
"""Test adding a feature with a callable."""
def compute_reward(frame_dict, episode_idx, frame_idx):
return float(episode_idx * 10 + frame_idx)
feature_info = {
"dtype": "float32",
"shape": (1,),
"names": None,
}
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(tmp_path / "with_reward")
new_dataset = add_feature(
sample_dataset,
feature_name="reward",
feature_values=compute_reward,
feature_info=feature_info,
output_dir=tmp_path / "with_reward",
)
assert "reward" in new_dataset.meta.features
items = [new_dataset[i] for i in range(10)]
first_episode_items = [item for item in items if item["episode_index"] == 0]
assert len(first_episode_items) == 10
first_frame = first_episode_items[0]
assert first_frame["frame_index"] == 0
assert float(first_frame["reward"]) == 0.0
def test_add_existing_feature(sample_dataset, tmp_path):
"""Test error when adding an existing feature."""
feature_info = {"dtype": "float32", "shape": (1,)}
with pytest.raises(ValueError, match="Feature 'action' already exists"):
add_feature(
sample_dataset,
feature_name="action",
feature_values=np.zeros(50),
feature_info=feature_info,
output_dir=tmp_path / "modified",
)
def test_add_feature_invalid_info(sample_dataset, tmp_path):
"""Test error with invalid feature info."""
with pytest.raises(ValueError, match="feature_info must contain keys"):
add_feature(
sample_dataset,
feature_name="reward",
feature_values=np.zeros(50),
feature_info={"dtype": "float32"},
output_dir=tmp_path / "modified",
)
def test_remove_single_feature(sample_dataset, tmp_path):
"""Test removing a single feature."""
feature_info = {"dtype": "float32", "shape": (1,), "names": None}
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.side_effect = lambda repo_id, **kwargs: str(kwargs.get("local_dir", tmp_path))
dataset_with_reward = add_feature(
sample_dataset,
feature_name="reward",
feature_values=np.random.randn(50, 1).astype(np.float32),
feature_info=feature_info,
output_dir=tmp_path / "with_reward",
)
dataset_without_reward = remove_feature(
dataset_with_reward,
feature_names="reward",
output_dir=tmp_path / "without_reward",
)
assert "reward" not in dataset_without_reward.meta.features
sample_item = dataset_without_reward[0]
assert "reward" not in sample_item
def test_remove_multiple_features(sample_dataset, tmp_path):
"""Test removing multiple features at once."""
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.side_effect = lambda repo_id, **kwargs: str(kwargs.get("local_dir", tmp_path))
dataset = sample_dataset
for feature_name in ["reward", "success"]:
feature_info = {"dtype": "float32", "shape": (1,), "names": None}
dataset = add_feature(
dataset,
feature_name=feature_name,
feature_values=np.random.randn(dataset.meta.total_frames, 1).astype(np.float32),
feature_info=feature_info,
output_dir=tmp_path / f"with_{feature_name}",
)
dataset_clean = remove_feature(
dataset,
feature_names=["reward", "success"],
output_dir=tmp_path / "clean",
)
assert "reward" not in dataset_clean.meta.features
assert "success" not in dataset_clean.meta.features
def test_remove_nonexistent_feature(sample_dataset, tmp_path):
"""Test error when removing non-existent feature."""
with pytest.raises(ValueError, match="Feature 'nonexistent' not found"):
remove_feature(
sample_dataset,
feature_names="nonexistent",
output_dir=tmp_path / "modified",
)
def test_remove_required_feature(sample_dataset, tmp_path):
"""Test error when trying to remove required features."""
with pytest.raises(ValueError, match="Cannot remove required features"):
remove_feature(
sample_dataset,
feature_names="timestamp",
output_dir=tmp_path / "modified",
)
def test_remove_camera_feature(sample_dataset, tmp_path):
"""Test removing a camera feature."""
camera_keys = sample_dataset.meta.camera_keys
if not camera_keys:
pytest.skip("No camera keys in dataset")
camera_to_remove = camera_keys[0]
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(tmp_path / "without_camera")
dataset_without_camera = remove_feature(
sample_dataset,
feature_names=camera_to_remove,
output_dir=tmp_path / "without_camera",
)
assert camera_to_remove not in dataset_without_camera.meta.features
assert camera_to_remove not in dataset_without_camera.meta.camera_keys
sample_item = dataset_without_camera[0]
assert camera_to_remove not in sample_item
def test_complex_workflow_integration(sample_dataset, tmp_path):
"""Test a complex workflow combining multiple operations."""
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.side_effect = lambda repo_id, **kwargs: str(kwargs.get("local_dir", tmp_path))
dataset = add_feature(
sample_dataset,
feature_name="reward",
feature_values=np.random.randn(50, 1).astype(np.float32),
feature_info={"dtype": "float32", "shape": (1,), "names": None},
output_dir=tmp_path / "step1",
)
dataset = delete_episodes(
dataset,
episode_indices=[2],
output_dir=tmp_path / "step2",
)
splits = split_dataset(
dataset,
splits={"train": 0.75, "val": 0.25},
output_dir=tmp_path / "step3",
)
merged = merge_datasets(
list(splits.values()),
output_repo_id="final_dataset",
output_dir=tmp_path / "step4",
)
assert merged.meta.total_episodes == 4
assert merged.meta.total_frames == 40
assert "reward" in merged.meta.features
assert len(merged) == 40
sample_item = merged[0]
assert "reward" in sample_item
def test_delete_episodes_preserves_stats(sample_dataset, tmp_path):
"""Test that deleting episodes preserves statistics correctly."""
output_dir = tmp_path / "filtered"
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(output_dir)
new_dataset = delete_episodes(
sample_dataset,
episode_indices=[2],
output_dir=output_dir,
)
assert new_dataset.meta.stats is not None
for feature in ["action", "observation.state"]:
assert feature in new_dataset.meta.stats
assert "mean" in new_dataset.meta.stats[feature]
assert "std" in new_dataset.meta.stats[feature]
def test_delete_episodes_preserves_tasks(sample_dataset, tmp_path):
"""Test that tasks are preserved correctly after deletion."""
output_dir = tmp_path / "filtered"
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(output_dir)
new_dataset = delete_episodes(
sample_dataset,
episode_indices=[0],
output_dir=output_dir,
)
assert new_dataset.meta.tasks is not None
assert len(new_dataset.meta.tasks) == 2
tasks_in_dataset = {str(item["task"]) for item in new_dataset}
assert len(tasks_in_dataset) > 0
def test_split_three_ways(sample_dataset, tmp_path):
"""Test splitting dataset into three splits."""
splits = {
"train": 0.6,
"val": 0.2,
"test": 0.2,
}
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
def mock_snapshot(repo_id, **kwargs):
for split_name in splits:
if split_name in repo_id:
return str(tmp_path / f"{sample_dataset.repo_id}_{split_name}")
return str(kwargs.get("local_dir", tmp_path))
mock_snapshot_download.side_effect = mock_snapshot
result = split_dataset(
sample_dataset,
splits=splits,
output_dir=tmp_path,
)
assert set(result.keys()) == {"train", "val", "test"}
assert result["train"].meta.total_episodes == 3
assert result["val"].meta.total_episodes == 1
assert result["test"].meta.total_episodes == 1
total_frames = sum(ds.meta.total_frames for ds in result.values())
assert total_frames == sample_dataset.meta.total_frames
def test_split_preserves_stats(sample_dataset, tmp_path):
"""Test that statistics are preserved when splitting."""
splits = {"train": [0, 1, 2], "val": [3, 4]}
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
def mock_snapshot(repo_id, **kwargs):
for split_name in splits:
if split_name in repo_id:
return str(tmp_path / f"{sample_dataset.repo_id}_{split_name}")
return str(kwargs.get("local_dir", tmp_path))
mock_snapshot_download.side_effect = mock_snapshot
result = split_dataset(
sample_dataset,
splits=splits,
output_dir=tmp_path,
)
for split_ds in result.values():
assert split_ds.meta.stats is not None
for feature in ["action", "observation.state"]:
assert feature in split_ds.meta.stats
assert "mean" in split_ds.meta.stats[feature]
assert "std" in split_ds.meta.stats[feature]
def test_merge_three_datasets(sample_dataset, tmp_path, empty_lerobot_dataset_factory):
"""Test merging three datasets."""
features = {
"action": {"dtype": "float32", "shape": (6,), "names": None},
"observation.state": {"dtype": "float32", "shape": (4,), "names": None},
"observation.images.top": {"dtype": "image", "shape": (224, 224, 3), "names": None},
}
datasets = [sample_dataset]
for i in range(2):
dataset = empty_lerobot_dataset_factory(
root=tmp_path / f"test_dataset{i + 2}",
features=features,
)
for ep_idx in range(2):
for _ in range(10):
frame = {
"action": np.random.randn(6).astype(np.float32),
"observation.state": np.random.randn(4).astype(np.float32),
"observation.images.top": np.random.randint(0, 255, size=(224, 224, 3), dtype=np.uint8),
"task": f"task_{ep_idx}",
}
dataset.add_frame(frame)
dataset.save_episode()
dataset.finalize()
datasets.append(dataset)
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(tmp_path / "merged_dataset")
merged = merge_datasets(
datasets,
output_repo_id="merged_dataset",
output_dir=tmp_path / "merged_dataset",
)
assert merged.meta.total_episodes == 9
assert merged.meta.total_frames == 90
def test_merge_preserves_stats(sample_dataset, tmp_path, empty_lerobot_dataset_factory):
"""Test that statistics are computed for merged datasets."""
features = {
"action": {"dtype": "float32", "shape": (6,), "names": None},
"observation.state": {"dtype": "float32", "shape": (4,), "names": None},
"observation.images.top": {"dtype": "image", "shape": (224, 224, 3), "names": None},
}
dataset2 = empty_lerobot_dataset_factory(
root=tmp_path / "test_dataset2",
features=features,
)
for ep_idx in range(3):
for _ in range(10):
frame = {
"action": np.random.randn(6).astype(np.float32),
"observation.state": np.random.randn(4).astype(np.float32),
"observation.images.top": np.random.randint(0, 255, size=(224, 224, 3), dtype=np.uint8),
"task": f"task_{ep_idx % 2}",
}
dataset2.add_frame(frame)
dataset2.save_episode()
dataset2.finalize()
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(tmp_path / "merged_dataset")
merged = merge_datasets(
[sample_dataset, dataset2],
output_repo_id="merged_dataset",
output_dir=tmp_path / "merged_dataset",
)
assert merged.meta.stats is not None
for feature in ["action", "observation.state"]:
assert feature in merged.meta.stats
assert "mean" in merged.meta.stats[feature]
assert "std" in merged.meta.stats[feature]
def test_add_feature_preserves_existing_stats(sample_dataset, tmp_path):
"""Test that adding a feature preserves existing stats."""
num_frames = sample_dataset.meta.total_frames
reward_values = np.random.randn(num_frames, 1).astype(np.float32)
feature_info = {
"dtype": "float32",
"shape": (1,),
"names": None,
}
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(tmp_path / "with_reward")
new_dataset = add_feature(
sample_dataset,
feature_name="reward",
feature_values=reward_values,
feature_info=feature_info,
output_dir=tmp_path / "with_reward",
)
assert new_dataset.meta.stats is not None
for feature in ["action", "observation.state"]:
assert feature in new_dataset.meta.stats
assert "mean" in new_dataset.meta.stats[feature]
assert "std" in new_dataset.meta.stats[feature]
def test_remove_feature_updates_stats(sample_dataset, tmp_path):
"""Test that removing a feature removes it from stats."""
feature_info = {"dtype": "float32", "shape": (1,), "names": None}
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.side_effect = lambda repo_id, **kwargs: str(kwargs.get("local_dir", tmp_path))
dataset_with_reward = add_feature(
sample_dataset,
feature_name="reward",
feature_values=np.random.randn(50, 1).astype(np.float32),
feature_info=feature_info,
output_dir=tmp_path / "with_reward",
)
dataset_without_reward = remove_feature(
dataset_with_reward,
feature_names="reward",
output_dir=tmp_path / "without_reward",
)
if dataset_without_reward.meta.stats:
assert "reward" not in dataset_without_reward.meta.stats
def test_delete_consecutive_episodes(sample_dataset, tmp_path):
"""Test deleting consecutive episodes."""
output_dir = tmp_path / "filtered"
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(output_dir)
new_dataset = delete_episodes(
sample_dataset,
episode_indices=[1, 2, 3],
output_dir=output_dir,
)
assert new_dataset.meta.total_episodes == 2
assert new_dataset.meta.total_frames == 20
episode_indices = sorted({int(idx.item()) for idx in new_dataset.hf_dataset["episode_index"]})
assert episode_indices == [0, 1]
def test_delete_first_and_last_episodes(sample_dataset, tmp_path):
"""Test deleting first and last episodes."""
output_dir = tmp_path / "filtered"
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(output_dir)
new_dataset = delete_episodes(
sample_dataset,
episode_indices=[0, 4],
output_dir=output_dir,
)
assert new_dataset.meta.total_episodes == 3
assert new_dataset.meta.total_frames == 30
episode_indices = sorted({int(idx.item()) for idx in new_dataset.hf_dataset["episode_index"]})
assert episode_indices == [0, 1, 2]
def test_split_all_episodes_assigned(sample_dataset, tmp_path):
"""Test that all episodes can be explicitly assigned to splits."""
splits = {
"split1": [0, 1],
"split2": [2, 3],
"split3": [4],
}
with (
patch("lerobot.datasets.lerobot_dataset.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.lerobot_dataset.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
def mock_snapshot(repo_id, **kwargs):
for split_name in splits:
if split_name in repo_id:
return str(tmp_path / f"{sample_dataset.repo_id}_{split_name}")
return str(kwargs.get("local_dir", tmp_path))
mock_snapshot_download.side_effect = mock_snapshot
result = split_dataset(
sample_dataset,
splits=splits,
output_dir=tmp_path,
)
total_episodes = sum(ds.meta.total_episodes for ds in result.values())
assert total_episodes == sample_dataset.meta.total_episodes
+143
View File
@@ -806,6 +806,8 @@ def test_episode_index_distribution(tmp_path, empty_lerobot_dataset_factory):
dataset.add_frame({"state": torch.randn(2), "task": f"task_{episode_idx}"})
dataset.save_episode()
dataset.finalize()
# Load the dataset and check episode indices
loaded_dataset = LeRobotDataset(dataset.repo_id, root=dataset.root)
@@ -855,6 +857,8 @@ def test_multi_episode_metadata_consistency(tmp_path, empty_lerobot_dataset_fact
dataset.add_frame({"state": torch.randn(3), ACTION: torch.randn(2), "task": tasks[episode_idx]})
dataset.save_episode()
dataset.finalize()
# Load and validate episode metadata
loaded_dataset = LeRobotDataset(dataset.repo_id, root=dataset.root)
@@ -893,6 +897,8 @@ def test_data_consistency_across_episodes(tmp_path, empty_lerobot_dataset_factor
dataset.add_frame({"state": torch.randn(1), "task": "consistency_test"})
dataset.save_episode()
dataset.finalize()
loaded_dataset = LeRobotDataset(dataset.repo_id, root=dataset.root)
# Check data consistency - no gaps or overlaps
@@ -944,6 +950,8 @@ def test_statistics_metadata_validation(tmp_path, empty_lerobot_dataset_factory)
dataset.add_frame({"state": state_data, ACTION: action_data, "task": "stats_test"})
dataset.save_episode()
dataset.finalize()
loaded_dataset = LeRobotDataset(dataset.repo_id, root=dataset.root)
# Check that statistics exist for all features
@@ -989,6 +997,8 @@ def test_episode_boundary_integrity(tmp_path, empty_lerobot_dataset_factory):
dataset.add_frame({"state": torch.tensor([float(frame_idx)]), "task": f"episode_{episode_idx}"})
dataset.save_episode()
dataset.finalize()
loaded_dataset = LeRobotDataset(dataset.repo_id, root=dataset.root)
# Test episode boundaries
@@ -1031,6 +1041,8 @@ def test_task_indexing_and_validation(tmp_path, empty_lerobot_dataset_factory):
dataset.add_frame({"state": torch.randn(1), "task": task})
dataset.save_episode()
dataset.finalize()
loaded_dataset = LeRobotDataset(dataset.repo_id, root=dataset.root)
# Check that all unique tasks are in the tasks metadata
@@ -1056,3 +1068,134 @@ def test_task_indexing_and_validation(tmp_path, empty_lerobot_dataset_factory):
# Check total number of tasks
assert loaded_dataset.meta.total_tasks == len(unique_tasks)
def test_dataset_resume_recording(tmp_path, empty_lerobot_dataset_factory):
"""Test that resuming dataset recording preserves previously recorded episodes.
This test validates the critical resume functionality by:
1. Recording initial episodes and finalizing
2. Reopening the dataset
3. Recording additional episodes
4. Verifying all data (old + new) is intact
This specifically tests the bug fix where parquet files were being overwritten
instead of appended to during resume.
"""
features = {
"observation.state": {"dtype": "float32", "shape": (2,), "names": ["x", "y"]},
"action": {"dtype": "float32", "shape": (2,), "names": ["x", "y"]},
}
dataset = empty_lerobot_dataset_factory(root=tmp_path / "test", features=features, use_videos=False)
initial_episodes = 2
frames_per_episode = 3
for ep_idx in range(initial_episodes):
for frame_idx in range(frames_per_episode):
dataset.add_frame(
{
"observation.state": torch.tensor([float(ep_idx), float(frame_idx)]),
"action": torch.tensor([0.5, 0.5]),
"task": f"task_{ep_idx}",
}
)
dataset.save_episode()
assert dataset.meta.total_episodes == initial_episodes
assert dataset.meta.total_frames == initial_episodes * frames_per_episode
dataset.finalize()
initial_root = dataset.root
initial_repo_id = dataset.repo_id
del dataset
dataset_verify = LeRobotDataset(initial_repo_id, root=initial_root, revision="v3.0")
assert dataset_verify.meta.total_episodes == initial_episodes
assert dataset_verify.meta.total_frames == initial_episodes * frames_per_episode
assert len(dataset_verify.hf_dataset) == initial_episodes * frames_per_episode
for idx in range(len(dataset_verify.hf_dataset)):
item = dataset_verify[idx]
expected_ep = idx // frames_per_episode
expected_frame = idx % frames_per_episode
assert item["episode_index"].item() == expected_ep
assert item["frame_index"].item() == expected_frame
assert item["index"].item() == idx
assert item["observation.state"][0].item() == float(expected_ep)
assert item["observation.state"][1].item() == float(expected_frame)
del dataset_verify
# Phase 3: Resume recording - add more episodes
dataset_resumed = LeRobotDataset(initial_repo_id, root=initial_root, revision="v3.0")
assert dataset_resumed.meta.total_episodes == initial_episodes
assert dataset_resumed.meta.total_frames == initial_episodes * frames_per_episode
assert dataset_resumed.latest_episode is None # Not recording yet
assert dataset_resumed.writer is None
assert dataset_resumed.meta.writer is None
additional_episodes = 2
for ep_idx in range(initial_episodes, initial_episodes + additional_episodes):
for frame_idx in range(frames_per_episode):
dataset_resumed.add_frame(
{
"observation.state": torch.tensor([float(ep_idx), float(frame_idx)]),
"action": torch.tensor([0.5, 0.5]),
"task": f"task_{ep_idx}",
}
)
dataset_resumed.save_episode()
total_episodes = initial_episodes + additional_episodes
total_frames = total_episodes * frames_per_episode
assert dataset_resumed.meta.total_episodes == total_episodes
assert dataset_resumed.meta.total_frames == total_frames
dataset_resumed.finalize()
del dataset_resumed
dataset_final = LeRobotDataset(initial_repo_id, root=initial_root, revision="v3.0")
assert dataset_final.meta.total_episodes == total_episodes
assert dataset_final.meta.total_frames == total_frames
assert len(dataset_final.hf_dataset) == total_frames
for idx in range(total_frames):
item = dataset_final[idx]
expected_ep = idx // frames_per_episode
expected_frame = idx % frames_per_episode
assert item["episode_index"].item() == expected_ep, (
f"Frame {idx}: wrong episode_index. Expected {expected_ep}, got {item['episode_index'].item()}"
)
assert item["frame_index"].item() == expected_frame, (
f"Frame {idx}: wrong frame_index. Expected {expected_frame}, got {item['frame_index'].item()}"
)
assert item["index"].item() == idx, (
f"Frame {idx}: wrong index. Expected {idx}, got {item['index'].item()}"
)
# Verify data integrity
assert item["observation.state"][0].item() == float(expected_ep), (
f"Frame {idx}: wrong observation.state[0]. Expected {float(expected_ep)}, "
f"got {item['observation.state'][0].item()}"
)
assert item["observation.state"][1].item() == float(expected_frame), (
f"Frame {idx}: wrong observation.state[1]. Expected {float(expected_frame)}, "
f"got {item['observation.state'][1].item()}"
)
assert len(dataset_final.meta.episodes) == total_episodes
for ep_idx in range(total_episodes):
ep_metadata = dataset_final.meta.episodes[ep_idx]
assert ep_metadata["episode_index"] == ep_idx
assert ep_metadata["length"] == frames_per_episode
assert ep_metadata["tasks"] == [f"task_{ep_idx}"]
expected_from = ep_idx * frames_per_episode
expected_to = (ep_idx + 1) * frames_per_episode
assert ep_metadata["dataset_from_index"] == expected_from
assert ep_metadata["dataset_to_index"] == expected_to
+16
View File
@@ -1,3 +1,19 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import types
from unittest.mock import MagicMock
+14
View File
@@ -1,5 +1,19 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test script to verify PI0 policy integration with LeRobot, only meant to be run locally!"""
import os
+14
View File
@@ -1,5 +1,19 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test script to verify PI0.5 (pi05) support in PI0 policy, only meant to be run locally!"""
import os
@@ -1,3 +1,19 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test script to verify PI0OpenPI policy integration with LeRobot vs the original implementation, only meant to be run locally!"""
import os
@@ -1,3 +1,19 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test script to verify PI0 policy integration with LeRobot vs the original implementation, only meant to be run locally!"""
import os
+16
View File
@@ -1,3 +1,19 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from lerobot.processor import DataProcessorPipeline, TransitionKey
+16
View File
@@ -1,3 +1,19 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import pytest
import torch
@@ -1,3 +1,19 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Tests for the TokenizerProcessorStep class.
"""
+4 -1
View File
@@ -1,4 +1,6 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -11,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from pathlib import Path
from typing import Any
+4 -1
View File
@@ -1,4 +1,6 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -11,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from lerobot.utils.logging_utils import AverageMeter, MetricsTracker
+4 -1
View File
@@ -1,4 +1,6 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -11,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import numpy as np
+4 -1
View File
@@ -1,4 +1,6 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -11,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pathlib import Path
from unittest.mock import Mock, patch
+16
View File
@@ -1,3 +1,19 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import sys
from types import SimpleNamespace