Compare commits

..

8 Commits

Author SHA1 Message Date
Michel Aractingi 73780046b2 Revert accidental changes to dataset files 2025-12-12 16:39:23 +00:00
Michel Aractingi 093a85f946 nit 2025-12-12 16:36:18 +00:00
Michel Aractingi a669049da2 Add option to use the fused optim version of ADamW 2025-12-12 16:31:03 +00:00
Michel Aractingi ce348a3460 enable variable image sizes to pi0/pi0.5 (#2609)
* enable variable image sizes to pi0/pi0.5

* add square image assertion
2025-12-10 19:41:11 +01:00
Jade Choghari cb920235c4 docs: update X-VLA training strategies/commands (#2611) 2025-12-09 19:08:09 +01:00
Jade Choghari 7f40b3bf82 feat(dataset): add tool to convert images to video datasets (#2560)
* add video encoding tool

* style

* make it work

* more fixes
2025-12-08 18:50:21 +01:00
Michel Aractingi 2e9c9fd832 Replay while loop in sample actions with for loops (#2600) 2025-12-08 14:47:54 +01:00
Steven Palma f9cb5e659c chore(ci): skip workflows if not lerobot repository (#2601)
Co-authored-by: Alex Tyshka <atyshka15@gmail.com>
2025-12-08 12:44:36 +01:00
22 changed files with 112 additions and 1101 deletions
@@ -31,7 +31,8 @@ jobs:
name: Upload Preview and Comment
if: >
github.event.workflow_run.event == 'pull_request' &&
github.event.workflow_run.conclusion == 'success'
github.event.workflow_run.conclusion == 'success' &&
github.repository == 'huggingface/lerobot'
uses: huggingface/doc-builder/.github/workflows/upload_pr_documentation.yml@main
with:
package_name: lerobot
+4 -2
View File
@@ -42,7 +42,9 @@ jobs:
# This job builds and deploys the official documentation.
build_main_docs:
name: Build Main Docs
if: github.event_name == 'push' || github.event_name == 'workflow_dispatch'
if: >
(github.event_name == 'push' || github.event_name == 'workflow_dispatch') &&
github.repository == 'huggingface/lerobot'
permissions:
contents: read
uses: huggingface/doc-builder/.github/workflows/build_main_documentation.yml@main
@@ -58,7 +60,7 @@ jobs:
# The result of this job triggers the 'Upload PR Documentation' workflow.
build_pr_docs:
name: Build PR Docs
if: github.event_name == 'pull_request'
if: github.event_name == 'pull_request' && github.repository == 'huggingface/lerobot'
permissions:
contents: read
pull-requests: write
-1
View File
@@ -45,7 +45,6 @@ permissions:
env:
UV_VERSION: "0.8.0"
PYTHON_VERSION: "3.10"
DOCKER_IMAGE_NAME: huggingface/lerobot-gpu
# Ensures that only the latest commit for a PR or branch is built, canceling older runs.
concurrency:
+2
View File
@@ -43,6 +43,7 @@ jobs:
name: Build CPU Docker for Nightly
runs-on:
group: aws-general-8-plus
if: github.repository == 'huggingface/lerobot'
outputs:
image_tag: ${{ env.DOCKER_IMAGE_NAME_CPU }}
steps:
@@ -77,6 +78,7 @@ jobs:
name: Build GPU Docker for Nightly
runs-on:
group: aws-general-8-plus
if: github.repository == 'huggingface/lerobot'
outputs:
image_tag: ${{ env.DOCKER_IMAGE_NAME_GPU }}
steps:
+1
View File
@@ -29,6 +29,7 @@ jobs:
build-and-publish:
name: Build and publish Python distributions
runs-on: ubuntu-latest
if: github.repository == 'huggingface/lerobot'
outputs:
version: ${{ steps.extract_info.outputs.tag_version }}
permissions:
+1
View File
@@ -45,6 +45,7 @@ jobs:
stale:
name: Close Stale Issues and PRs
runs-on: ubuntu-latest
if: github.repository == 'huggingface/lerobot'
permissions:
actions: write
contents: write # only for delete-branch option
+1
View File
@@ -43,6 +43,7 @@ jobs:
full-tests:
name: Full Unbound Tests
runs-on: ubuntu-latest
if: github.repository == 'huggingface/lerobot'
env:
MUJOCO_GL: egl
HF_HOME: /mnt/cache/.cache/huggingface
+40 -82
View File
@@ -24,7 +24,7 @@ Built from pure Transformer encoders, X-VLA scales naturally with model size and
<img
src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/lerobot/xvla-architecture2.png"
alt="XVLA Architecture 2"
style="width: 32%; max-width: 450px; height: auto;"
style="width: 60%; height: auto;"
/>
</p>
@@ -120,7 +120,7 @@ Adapted for Google Robot platforms.
### Recommended Training Configuration
When fine-tuning X-VLA for a new embodiment or task, we recommend the following freezing strategy:
When fine-tuning X-VLA for a new embodiment or task, we recommend not freezing the VLM, and also setting the `policy.dtype=bfloat16` to not hit OOM errors.
```bash
lerobot-train \
@@ -129,25 +129,26 @@ lerobot-train \
--job_name=xvla_training \
--policy.path="lerobot/xvla-base" \
--policy.repo_id="HF_USER/xvla-your-robot" \
--policy.dtype=bfloat16 \
--steps=3000 \
--policy.device=cuda \
--policy.freeze_vision_encoder=True \
--policy.freeze_language_encoder=True \
--policy.train_policy_transformer=True \
--policy.train_soft_prompts=True \
--policy.freeze_vision_encoder=false \
--policy.freeze_language_encoder=false \
--policy.train_policy_transformer=true \
--policy.train_soft_prompts=true \
--policy.action_mode=YOUR_ACTION_MODE
```
### Training Parameters Explained
| Parameter | Default | Description |
| -------------------------- | ------- | ---------------------------------------- |
| `freeze_vision_encoder` | `True` | Freeze the VLM vision encoder weights |
| `freeze_language_encoder` | `True` | Freeze the VLM language encoder weights |
| `train_policy_transformer` | `True` | Allow policy transformer layers to train |
| `train_soft_prompts` | `True` | Allow soft prompts to train |
| Parameter | Default | Description |
| -------------------------- | ------- | ---------------------------------------------- |
| `freeze_vision_encoder` | `false` | Do not freeze the VLM vision encoder weights |
| `freeze_language_encoder` | `false` | Do not freeze the VLM language encoder weights |
| `train_policy_transformer` | `true` | Allow policy transformer layers to train |
| `train_soft_prompts` | `true` | Allow soft prompts to train |
**💡 Best Practice**: For Phase II adaptation to new embodiments, freeze the VLM encoders and only train the policy transformer and soft prompts. This provides excellent sample efficiency with minimal compute.
**💡 Best Practice**: For Phase II adaptation to new embodiments, do not freeze the VLM encoders and also train the policy transformer and soft prompts.
### Example: Training on Bimanual Robot
@@ -157,14 +158,15 @@ lerobot-train \
--output_dir=./outputs/xvla_bimanual \
--job_name=xvla_so101_training \
--policy.path="lerobot/xvla-base" \
--policy.dtype=bfloat16 \
--policy.repo_id="YOUR_USERNAME/xvla-biso101" \
--steps=3000 \
--policy.device=cuda \
--policy.action_mode=so101_bimanual \
--policy.freeze_vision_encoder=True \
--policy.freeze_language_encoder=True \
--policy.train_policy_transformer=True \
--policy.train_soft_prompts=True
--policy.freeze_vision_encoder=false \
--policy.freeze_language_encoder=false \
--policy.train_policy_transformer=true \
--policy.train_soft_prompts=true
```
💡 **Best Performance:** If you have sufficient computational resources and want to achieve best X-VLA finetuning performance, you should follow the official finetuning strategy:
@@ -172,71 +174,7 @@ lerobot-train \
**🔥 Full-finetune all components with a custom learning-rate scheme**
To ensure stable optimization, the Vision-Language Model (VLM) must be trained with only 1/10 of the base learning rate, while all other components use the full LR.
This LR ratio is crucial for achieving strong and stable finetuning performance.
To enable this behavior, you must:
1. Implement a custom optimizer and register it in your training config
```
from dataclasses import dataclass, asdict
from lerobot.optim.optimizers import OptimizerConfig
import torch
@OptimizerConfig.register_subclass("xvla-adamw")
@dataclass
class XVLAAdamW(OptimizerConfig):
lr: float = 1e-4
betas: tuple[float, float] = (0.9, 0.99)
eps: float = 1e-8
weight_decay: float = 0.0
grad_clip_norm: float = 10.0
def build(self, params: dict) -> torch.optim.Optimizer:
"""
Expect `named_parameters()` as input.
Apply lr = lr / 10 for all VLM-related parameters.
"""
assert isinstance(params, dict), \
"Custom LR optimizer requires `named_parameters()` as inputs."
kwargs = asdict(self)
kwargs.pop("grad_clip_norm")
vlm_group, other_group = [], []
for name, p in params.items():
if not p.requires_grad:
continue
if "vlm" in name.lower():
vlm_group.append(p)
else:
other_group.append(p)
param_groups = [
{"params": vlm_group, "lr": self.lr * 0.1, "weight_decay": self.weight_decay * 0.1},
{"params": other_group, "lr": self.lr, "weight_decay": self.weight_decay},
]
return torch.optim.AdamW(param_groups, **kwargs)
```
2. Modify X-VLAs get_optim_params to return named parameters
Replace:
```
def get_optim_params(self) -> dict:
"""Return only trainable parameters for optimization."""
return filter(lambda p: p.requires_grad, self.parameters())
```
with:
```
def get_optim_params(self):
"""Return trainable named parameters."""
return filter(lambda kv: kv[1].requires_grad, self.named_parameters())
```
This ensures the optimizer receives a dict of named parameters, allowing it to correctly detect VLM modules and apply the 1/10 LR rule.
This LR ratio is crucial for achieving strong and stable finetuning performance. This is already done for you by default.
❕Note
Completely matching the official reported performance may require an additional warm-up LR schedule for soft-prompts, which can bring minor improvements.
@@ -326,6 +264,26 @@ domain_id = 3
The domain_id is automatically added to observations by the `XVLAAddDomainIdProcessorStep` in the preprocessing pipeline.
The `lerobot/xvla-base` model has been trained on the following domain IDs. It is recommended to choose one that most resembles your robot/configuration:
#### Fine-tuning Datasets
| Dataset Name | Domain ID |
| ---------------- | --------- |
| Bridge | 0 |
| RT1 | 1 |
| Calvin | 2 |
| libero | 3 |
| widowx-air | 4 |
| AIR-AGILEX-HQ | 5 |
| robotwin2_abs_ee | 6 |
| robotwin2_clean | 6 |
| robocasa-human | 7 |
| VLABench | 8 |
| AGIBOT-challenge | 9 |
| AIR-AGILEX | 10 |
| AIRBOT | 18 |
### 3. Processor Steps
X-VLA requires specific preprocessing and postprocessing steps for proper operation.
-20
View File
@@ -1,20 +0,0 @@
Goal:
Create a high and low policy, the high policy take obs + text and return text, its a VLM
The low policy is a VLA that take text returned by the high and ouptut actions
High policy is ran every one second or when user send prompt
Synthetic data generation:
D demo -> teleop data with a global task annotation
D label -> segment data into short skills (one to three seconds)
D syn: p-gen will create the high level prompt user might gave to p-hi
Given D label prompt p-gen to imagine appropriate action, take images, ALL PRIOR skill labels in the episode: ℓ̂₀, …, ℓ̂ₜ₋₁
“Given the scene + all previous steps + current needed skill ℓ̂₅,
generate a user request that logically leads to ℓ̂₅.”
Train:
Phi(lt| images, global label) cross entropy - next token predictions
Plow(At| images, qt, lt)
@@ -1,141 +0,0 @@
#!/bin/bash
#SBATCH --time=24:00:00
#SBATCH --partition=hopper-cpu
#SBATCH --cpus-per-task=96
#SBATCH --output=/fsx/jade_choghari/logs/launcher_%j.out
#SBATCH --error=/fsx/jade_choghari/logs/launcher_%j.err
# Activate conda environment
# Load conda
source /fsx/jade_choghari/miniforge3/etc/profile.d/conda.sh
conda activate lerobot
set -e # Exit on error
# Input dataset
REPO_ID="lerobot"
ROOT="/fsx/jade_choghari/vlabench-primitive/"
# Output paths
OUTPUT_DIR="/fsx/jade_choghari/vlabench-primitive-encoded"
OUTPUT_REPO_ID="vlabench-primitive-encoded"
LOGS_DIR="/fsx/jade_choghari/logs/convert_video"
# Video encoding settings
VCODEC="libsvtav1"
PIX_FMT="yuv420p"
GOP_SIZE=2
CRF=30
FAST_DECODE=0
# Parallelization settings
NUM_WORKERS=24 # Number of parallel SLURM workers
NUM_IMAGE_WORKERS=4 # Threads per worker for image saving
# SLURM settings
PARTITION="hopper-cpu" # Change to your CPU partition name
CPUS_PER_TASK=32 # CPUs per worker
MEM_PER_CPU="4G" # Memory per CPU
TIME_LIMIT="24:00:00" # Time limit per job
###############################################################################
# STEP 1: Parallel Video Conversion
###############################################################################
rm -rf "${OUTPUT_DIR}"
mkdir -p "${OUTPUT_DIR}"
echo "=============================================="
echo "STEP 1: Starting parallel video conversion"
echo " Workers: ${NUM_WORKERS}"
echo " Input: ${REPO_ID} (root: ${ROOT})"
echo " Output: ${OUTPUT_DIR}"
echo "=============================================="
python /admin/home/jade_choghari/lerobot/examples/port_datasets/slurm_convert_to_video.py\
--repo-id "${REPO_ID}" \
--root "${ROOT}" \
--output-dir "${OUTPUT_DIR}" \
--output-repo-id "${OUTPUT_REPO_ID}" \
--vcodec "${VCODEC}" \
--pix-fmt "${PIX_FMT}" \
--g ${GOP_SIZE} \
--crf ${CRF} \
--fast-decode ${FAST_DECODE} \
--num-image-workers ${NUM_IMAGE_WORKERS} \
--logs-dir "${LOGS_DIR}" \
--job-name "convert_video" \
--slurm 1 \
--workers ${NUM_WORKERS} \
--partition "${PARTITION}" \
--cpus-per-task ${CPUS_PER_TASK} \
--mem-per-cpu "${MEM_PER_CPU}" \
--time-limit "${TIME_LIMIT}"
echo ""
echo "✓ Parallel conversion jobs submitted!"
echo " Monitor with: squeue -u \$USER"
echo " Check logs in: ${LOGS_DIR}/convert_video"
echo ""
echo "Wait for all jobs to complete before running Step 2."
echo "You can check completion with: squeue -u \$USER | grep convert_video"
echo ""
echo "After all jobs complete, run Step 2 to aggregate shards:"
echo " bash convert_to_video_parallel.sh aggregate"
###############################################################################
# STEP 2: Aggregate Shards (run this after Step 1 completes)
###############################################################################
if [ "$1" == "aggregate" ]; then
echo ""
echo "=============================================="
echo "STEP 2: Aggregating video shards"
echo " Shards: ${NUM_WORKERS}"
echo " Input: ${OUTPUT_DIR}/shard_XXXX"
echo " Output: ${OUTPUT_DIR}_final"
echo "=============================================="
python slurm_aggregate_video_shards.py \
--shards-dir "${OUTPUT_DIR}" \
--output-dir "${OUTPUT_DIR}_final" \
--output-repo-id "${OUTPUT_REPO_ID}" \
--num-shards ${NUM_WORKERS} \
--logs-dir "${LOGS_DIR}" \
--job-name "aggregate_video" \
--slurm 1 \
--partition "${PARTITION}" \
--cpus-per-task 16 \
--mem-per-cpu "8G" \
--time-limit "08:00:00"
echo ""
echo "✓ Aggregation job submitted!"
echo " Monitor with: squeue -u \$USER | grep aggregate_video"
echo " Check logs in: ${LOGS_DIR}/aggregate_video"
echo ""
echo "After completion, your final dataset will be in:"
echo " ${OUTPUT_DIR}_final"
fi
###############################################################################
# Helpful information
###############################################################################
if [ "$1" != "aggregate" ]; then
echo ""
echo "=============================================="
echo "WORKFLOW SUMMARY"
echo "=============================================="
echo ""
echo "1. Step 1 is now running - it will:"
echo " - Split episodes across ${NUM_WORKERS} workers"
echo " - Each worker converts its episodes to video"
echo " - Creates shard datasets in ${OUTPUT_DIR}/shard_XXXX"
echo ""
echo "2. After Step 1 completes, run Step 2:"
echo " bash convert_to_video_parallel.sh aggregate"
echo ""
echo "3. Step 2 will merge all shards into a single dataset"
echo ""
echo "=============================================="
fi
@@ -1,266 +0,0 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Aggregate video dataset shards into a single dataset.
After parallel conversion using slurm_convert_to_video.py, this script merges
all the shard datasets into one final dataset.
Example usage:
python slurm_aggregate_video_shards.py \
--shards-dir /fsx/jade_choghari/libero_video \
--output-dir /fsx/jade_choghari/libero_video_final \
--output-repo-id lerobot_video \
--num-workers 100 \
--partition cpu_partition \
--cpus-per-task 16
"""
import argparse
import logging
from pathlib import Path
from datatrove.executor import LocalPipelineExecutor
from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.base import PipelineStep
class AggregateVideoShards(PipelineStep):
"""Pipeline step that aggregates video dataset shards."""
def __init__(
self,
shards_dir: str | Path,
output_dir: str | Path,
output_repo_id: str,
num_shards: int,
):
super().__init__()
self.shards_dir = Path(shards_dir)
self.output_dir = Path(output_dir)
self.output_repo_id = output_repo_id
self.num_shards = num_shards
def run(self, data=None, rank: int = 0, world_size: int = 1):
"""Aggregate all shards into a single dataset."""
from lerobot.datasets.dataset_tools import merge_datasets
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.utils.utils import init_logging
init_logging()
# Only worker 0 performs aggregation
if rank != 0:
logging.info(f"Worker {rank} skipping - only worker 0 performs aggregation")
return
logging.info(f"Starting aggregation of {self.num_shards} shards")
# Collect all shard datasets
shard_datasets = []
for shard_idx in range(self.num_shards):
shard_dir = self.shards_dir / f"shard_{shard_idx:04d}"
if not shard_dir.exists():
logging.warning(f"Shard directory not found: {shard_dir}")
continue
# Find the repo_id for this shard
shard_repo_id = f"{self.output_repo_id}_shard_{shard_idx:04d}"
try:
shard_dataset = LeRobotDataset(shard_repo_id, root=shard_dir)
shard_datasets.append(shard_dataset)
logging.info(
f"Loaded shard {shard_idx}: {shard_dataset.meta.total_episodes} episodes, "
f"{shard_dataset.meta.total_frames} frames"
)
except Exception as e:
logging.error(f"Failed to load shard {shard_idx}: {e}")
continue
if len(shard_datasets) == 0:
raise ValueError(f"No valid shards found in {self.shards_dir}")
logging.info(f"Successfully loaded {len(shard_datasets)} shards, starting merge")
# Merge all shards
self.output_dir.mkdir(parents=True, exist_ok=True)
merged_dataset = merge_datasets(
shard_datasets,
output_repo_id=self.output_repo_id,
output_dir=self.output_dir,
)
logging.info("✓ Aggregation complete!")
logging.info(f"Merged dataset saved to: {self.output_dir}")
logging.info(f"Total episodes: {merged_dataset.meta.total_episodes}")
logging.info(f"Total frames: {merged_dataset.meta.total_frames}")
def make_aggregate_executor(
shards_dir,
output_dir,
output_repo_id,
num_shards,
job_name,
logs_dir,
partition,
cpus_per_task,
mem_per_cpu,
time_limit,
slurm=True,
):
"""Create executor for shard aggregation."""
kwargs = {
"pipeline": [
AggregateVideoShards(
shards_dir=shards_dir,
output_dir=output_dir,
output_repo_id=output_repo_id,
num_shards=num_shards,
),
],
"logging_dir": str(logs_dir / job_name),
}
if slurm:
# Only need 1 worker for aggregation
kwargs.update(
{
"job_name": job_name,
"tasks": 1,
"workers": 1,
"time": time_limit,
"partition": partition,
"cpus_per_task": cpus_per_task,
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
}
)
executor = SlurmPipelineExecutor(**kwargs)
else:
kwargs.update(
{
"tasks": 1,
"workers": 1,
}
)
executor = LocalPipelineExecutor(**kwargs)
return executor
def main():
parser = argparse.ArgumentParser(
description="Aggregate video dataset shards into a single dataset",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--shards-dir",
type=Path,
required=True,
help="Directory containing shard_XXXX subdirectories",
)
parser.add_argument(
"--output-dir",
type=Path,
required=True,
help="Output directory for the aggregated dataset",
)
parser.add_argument(
"--output-repo-id",
type=str,
required=True,
help="Repository ID for the aggregated dataset",
)
parser.add_argument(
"--num-shards",
type=int,
required=True,
help="Number of shards to aggregate (should match --workers from conversion)",
)
parser.add_argument(
"--logs-dir",
type=Path,
required=True,
help="Path to logs directory for datatrove",
)
parser.add_argument(
"--job-name",
type=str,
default="aggregate_video_shards",
help="Job name for SLURM",
)
parser.add_argument(
"--slurm",
type=int,
default=1,
help="Launch over SLURM (1) or locally (0)",
)
parser.add_argument(
"--partition",
type=str,
required=True,
help="SLURM partition (use CPU partition)",
)
parser.add_argument(
"--cpus-per-task",
type=int,
default=16,
help="Number of CPUs per task (aggregation can use more CPUs)",
)
parser.add_argument(
"--mem-per-cpu",
type=str,
default="8G",
help="Memory per CPU",
)
parser.add_argument(
"--time-limit",
type=str,
default="08:00:00",
help="Time limit for SLURM job",
)
args = parser.parse_args()
# Convert slurm flag to boolean
slurm = args.slurm == 1
# Create and run executor
executor = make_aggregate_executor(
shards_dir=args.shards_dir,
output_dir=args.output_dir,
output_repo_id=args.output_repo_id,
num_shards=args.num_shards,
job_name=args.job_name,
logs_dir=args.logs_dir,
partition=args.partition,
cpus_per_task=args.cpus_per_task,
mem_per_cpu=args.mem_per_cpu,
time_limit=args.time_limit,
slurm=slurm,
)
logging.info("Starting shard aggregation")
executor.run()
logging.info("Aggregation job submitted/completed")
if __name__ == "__main__":
main()
@@ -1,539 +0,0 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Parallelize video conversion using SLURM and Datatrove.
This script converts an image-based LeRobot dataset to video format by distributing
episodes across multiple workers for parallel processing.
Example usage:
python slurm_convert_to_video.py \
--repo-id lerobot \
--root /fsx/jade_choghari/libero/ \
--output-dir /fsx/jade_choghari/libero_video \
--output-repo-id lerobot_video \
--workers 100 \
--partition cpu_partition \
--cpus-per-task 8 \
--logs-dir ./logs
"""
import argparse
import logging
from pathlib import Path
from datatrove.executor import LocalPipelineExecutor
from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.base import PipelineStep
class ConvertEpisodesToVideo(PipelineStep):
"""Pipeline step that converts episodes to videos in parallel."""
def __init__(
self,
repo_id: str,
root: str | None,
output_dir: str,
output_repo_id: str | None,
vcodec: str = "libsvtav1",
pix_fmt: str = "yuv420p",
g: int = 2,
crf: int = 30,
fast_decode: int = 0,
num_image_workers: int = 4,
):
super().__init__()
self.repo_id = repo_id
self.root = root
self.output_dir = Path(output_dir)
self.output_repo_id = output_repo_id or f"{repo_id}_video"
self.vcodec = vcodec
self.pix_fmt = pix_fmt
self.g = g
self.crf = crf
self.fast_decode = fast_decode
self.num_image_workers = num_image_workers
def run(self, data=None, rank: int = 0, world_size: int = 1):
"""Process a shard of episodes."""
from datasets.utils.tqdm import disable_progress_bars
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.scripts.lerobot_edit_dataset import (
encode_episode_videos,
)
from lerobot.utils.utils import init_logging
import logging
init_logging()
disable_progress_bars()
logging.info(f"Worker {rank}/{world_size} starting video conversion")
# Load source dataset
dataset = LeRobotDataset(self.repo_id, root=self.root)
total_episodes = dataset.meta.total_episodes
# Determine which episodes this worker processes
episodes_per_worker = total_episodes // world_size
remainder = total_episodes % world_size
if rank < remainder:
# First 'remainder' workers get one extra episode
start_ep = rank * (episodes_per_worker + 1)
end_ep = start_ep + episodes_per_worker + 1
else:
start_ep = remainder * (episodes_per_worker + 1) + (rank - remainder) * episodes_per_worker
end_ep = start_ep + episodes_per_worker
episode_indices = list(range(start_ep, end_ep))
logging.info(
f"Worker {rank} processing episodes {start_ep} to {end_ep-1} ({len(episode_indices)} episodes)"
)
if len(episode_indices) == 0:
logging.info(f"Worker {rank} has no episodes to process")
return
# Create shard-specific output directory
import shutil
shard_output_dir = self.output_dir / f"shard_{rank:04d}"
# Remove existing directory to avoid conflicts with LeRobotDatasetMetadata.create
if shard_output_dir.exists():
logging.warning(f"Shard directory {shard_output_dir} already exists, removing it")
shutil.rmtree(shard_output_dir)
# Import conversion function
from lerobot.scripts.lerobot_edit_dataset import convert_dataset_to_videos
logging.info(
f"Worker {rank} converting {len(episode_indices)} episodes with codec {self.vcodec}, CRF {self.crf}"
)
# Convert this shard's episodes with remapped indices
# We need to remap episode indices to start from 0 for proper file structure
self._convert_shard_to_videos(
dataset=dataset,
shard_output_dir=shard_output_dir,
shard_repo_id=f"{self.output_repo_id}_shard_{rank:04d}",
episode_indices=episode_indices,
)
logging.info(f"Worker {rank} completed successfully")
def _convert_shard_to_videos(
self,
dataset,
shard_output_dir,
shard_repo_id,
episode_indices,
):
"""Convert a shard's episodes to videos with proper index remapping."""
import shutil
import pandas as pd
from tqdm import tqdm
from lerobot.datasets.lerobot_dataset import LeRobotDatasetMetadata
from lerobot.datasets.utils import write_info, write_stats, write_tasks
from lerobot.datasets.video_utils import encode_video_frames, get_video_info
from lerobot.scripts.lerobot_edit_dataset import (
save_episode_images_for_video,
_copy_data_without_images,
)
from lerobot.utils.constants import OBS_IMAGE
# Check that it's an image dataset
if len(dataset.meta.video_keys) > 0:
raise ValueError(
f"This operation is for image datasets only. Video dataset provided: {dataset.repo_id}"
)
# Get all image keys
hf_dataset = dataset.hf_dataset.with_format(None)
img_keys = [key for key in hf_dataset.features if key.startswith(OBS_IMAGE)]
if len(img_keys) == 0:
raise ValueError(f"No image keys found in dataset {dataset.repo_id}")
logging.info(f"Converting {len(episode_indices)} episodes with {len(img_keys)} cameras")
# Create new features dict, converting image features to video features
new_features = {}
for key, value in dataset.meta.features.items():
if key not in img_keys:
new_features[key] = value
else:
# Convert image key to video format
new_features[key] = value.copy()
new_features[key]["dtype"] = "video"
# Create new metadata for video dataset
new_meta = LeRobotDatasetMetadata.create(
repo_id=shard_repo_id,
fps=dataset.meta.fps,
features=new_features,
robot_type=dataset.meta.robot_type,
root=shard_output_dir,
use_videos=True,
chunks_size=dataset.meta.chunks_size,
data_files_size_in_mb=dataset.meta.data_files_size_in_mb,
video_files_size_in_mb=dataset.meta.video_files_size_in_mb,
)
# Create temporary directory for image extraction
temp_dir = shard_output_dir / "temp_images"
temp_dir.mkdir(parents=True, exist_ok=True)
# Process each episode with REMAPPED indices (0, 1, 2, ...)
all_episode_metadata = []
fps = int(dataset.fps)
try:
for new_ep_idx, orig_ep_idx in enumerate(tqdm(episode_indices, desc="Converting episodes")):
# Get episode metadata from source using ORIGINAL index
src_episode = dataset.meta.episodes[orig_ep_idx]
episode_length = src_episode["length"]
episode_duration = episode_length / dataset.fps
video_metadata = {}
# Encode videos for each camera
for img_key in img_keys:
# Save images temporarily using ORIGINAL episode index
imgs_dir = temp_dir / f"episode_{orig_ep_idx:06d}" / img_key
save_episode_images_for_video(
dataset, imgs_dir, img_key, orig_ep_idx, self.num_image_workers
)
# Determine chunk and file indices using NEW (remapped) episode index
chunk_idx = new_ep_idx // new_meta.chunks_size
file_idx = new_ep_idx % new_meta.chunks_size
# Create video path in the new dataset structure
video_path = new_meta.root / new_meta.video_path.format(
video_key=img_key, chunk_index=chunk_idx, file_index=file_idx
)
video_path.parent.mkdir(parents=True, exist_ok=True)
# Encode video
encode_video_frames(
imgs_dir=imgs_dir,
video_path=video_path,
fps=fps,
vcodec=self.vcodec,
pix_fmt=self.pix_fmt,
g=self.g,
crf=self.crf,
fast_decode=self.fast_decode,
overwrite=True,
)
# Clean up temporary images
shutil.rmtree(imgs_dir)
# Store video metadata
video_metadata[img_key] = {
f"videos/{img_key}/chunk_index": chunk_idx,
f"videos/{img_key}/file_index": file_idx,
f"videos/{img_key}/from_timestamp": 0.0,
f"videos/{img_key}/to_timestamp": episode_duration,
}
# Build episode metadata using NEW index
episode_meta = {
"episode_index": new_ep_idx,
"length": episode_length,
"dataset_from_index": new_ep_idx * episode_length,
"dataset_to_index": (new_ep_idx + 1) * episode_length,
}
# Add video metadata
for img_key in img_keys:
episode_meta.update(video_metadata[img_key])
# Add data chunk/file info
if "data/chunk_index" in src_episode:
episode_meta["data/chunk_index"] = src_episode["data/chunk_index"]
episode_meta["data/file_index"] = src_episode["data/file_index"]
all_episode_metadata.append(episode_meta)
# Copy and transform data files (removing image columns)
_copy_data_without_images(dataset, new_meta, episode_indices, img_keys)
# Save episode metadata
episodes_df = pd.DataFrame(all_episode_metadata)
episodes_path = new_meta.root / "meta" / "episodes" / "chunk-000" / "file-000.parquet"
episodes_path.parent.mkdir(parents=True, exist_ok=True)
episodes_df.to_parquet(episodes_path, index=False)
# Update metadata info
new_meta.info["total_episodes"] = len(episode_indices)
new_meta.info["total_frames"] = sum(ep["length"] for ep in all_episode_metadata)
new_meta.info["total_tasks"] = dataset.meta.total_tasks
new_meta.info["splits"] = {"train": f"0:{len(episode_indices)}"}
# Update video info for all image keys using the actual first video file
for img_key in img_keys:
if not new_meta.features[img_key].get("info", None):
# Use the first actually created video file
chunk_idx = all_episode_metadata[0][f"videos/{img_key}/chunk_index"]
file_idx = all_episode_metadata[0][f"videos/{img_key}/file_index"]
video_path = new_meta.root / new_meta.video_path.format(
video_key=img_key, chunk_index=chunk_idx, file_index=file_idx
)
new_meta.info["features"][img_key]["info"] = get_video_info(video_path)
write_info(new_meta.info, new_meta.root)
# Copy stats and tasks
if dataset.meta.stats is not None:
# Remove image stats
new_stats = {k: v for k, v in dataset.meta.stats.items() if k not in img_keys}
write_stats(new_stats, new_meta.root)
if dataset.meta.tasks is not None:
write_tasks(dataset.meta.tasks, new_meta.root)
finally:
# Clean up temporary directory
if temp_dir.exists():
shutil.rmtree(temp_dir)
def make_convert_executor(
repo_id,
root,
output_dir,
output_repo_id,
vcodec,
pix_fmt,
g,
crf,
fast_decode,
num_image_workers,
job_name,
logs_dir,
workers,
partition,
cpus_per_task,
mem_per_cpu,
time_limit,
slurm=True,
):
"""Create executor for parallel video conversion."""
kwargs = {
"pipeline": [
ConvertEpisodesToVideo(
repo_id=repo_id,
root=root,
output_dir=output_dir,
output_repo_id=output_repo_id,
vcodec=vcodec,
pix_fmt=pix_fmt,
g=g,
crf=crf,
fast_decode=fast_decode,
num_image_workers=num_image_workers,
),
],
"logging_dir": str(logs_dir / job_name),
}
if slurm:
kwargs.update(
{
"job_name": job_name,
"tasks": workers, # Number of parallel tasks = number of workers
"workers": workers,
"time": time_limit,
"partition": partition,
"cpus_per_task": cpus_per_task,
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
}
)
executor = SlurmPipelineExecutor(**kwargs)
else:
kwargs.update(
{
"tasks": workers,
"workers": 1, # Local mode: sequential
}
)
executor = LocalPipelineExecutor(**kwargs)
return executor
def main():
parser = argparse.ArgumentParser(
description="Parallelize video conversion across SLURM workers",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
# Input/Output paths
parser.add_argument(
"--repo-id",
type=str,
required=True,
help="Repository ID of the source image dataset",
)
parser.add_argument(
"--root",
type=str,
default=None,
help="Root directory containing the source dataset (default: HF cache)",
)
parser.add_argument(
"--output-dir",
type=str,
required=True,
help="Output directory for the video dataset",
)
parser.add_argument(
"--output-repo-id",
type=str,
default=None,
help="Repository ID for output dataset (default: <repo_id>_video)",
)
# Video encoding parameters
parser.add_argument(
"--vcodec",
type=str,
default="libsvtav1",
help="Video codec",
)
parser.add_argument(
"--pix-fmt",
type=str,
default="yuv420p",
help="Pixel format",
)
parser.add_argument(
"--g",
type=int,
default=2,
help="GOP size (group of pictures)",
)
parser.add_argument(
"--crf",
type=int,
default=30,
help="Constant rate factor (quality)",
)
parser.add_argument(
"--fast-decode",
type=int,
default=0,
help="Fast decode tuning",
)
parser.add_argument(
"--num-image-workers",
type=int,
default=4,
help="Number of threads per worker for saving images",
)
# SLURM parameters
parser.add_argument(
"--logs-dir",
type=Path,
required=True,
help="Path to logs directory for datatrove",
)
parser.add_argument(
"--job-name",
type=str,
default="convert_to_video",
help="Job name for SLURM",
)
parser.add_argument(
"--slurm",
type=int,
default=1,
help="Launch over SLURM (1) or locally (0)",
)
parser.add_argument(
"--workers",
type=int,
default=100,
help="Number of parallel workers (each processes different episodes)",
)
parser.add_argument(
"--partition",
type=str,
required=True,
help="SLURM partition (use CPU partition)",
)
parser.add_argument(
"--cpus-per-task",
type=int,
default=8,
help="Number of CPUs per SLURM task",
)
parser.add_argument(
"--mem-per-cpu",
type=str,
default="4G",
help="Memory per CPU",
)
parser.add_argument(
"--time-limit",
type=str,
default="08:00:00",
help="Time limit for SLURM job",
)
args = parser.parse_args()
# Convert slurm flag to boolean
slurm = args.slurm == 1
# Create and run executor
executor = make_convert_executor(
repo_id=args.repo_id,
root=args.root,
output_dir=args.output_dir,
output_repo_id=args.output_repo_id,
vcodec=args.vcodec,
pix_fmt=args.pix_fmt,
g=args.g,
crf=args.crf,
fast_decode=args.fast_decode,
num_image_workers=args.num_image_workers,
job_name=args.job_name,
logs_dir=args.logs_dir,
workers=args.workers,
partition=args.partition,
cpus_per_task=args.cpus_per_task,
mem_per_cpu=args.mem_per_cpu,
time_limit=args.time_limit,
slurm=slurm,
)
logging.info(f"Starting video conversion with {args.workers} workers")
executor.run()
logging.info("All workers submitted/completed")
if __name__ == "__main__":
main()
-7
View File
@@ -1,7 +0,0 @@
==============================================
STEP 1: Starting parallel video conversion
Workers: 2
Input: lerobot (root: /fsx/jade_choghari/vlabench-primitive/)
Output: /fsx/jade_choghari/vlabench-primitive-encoded
==============================================
python: can't open file '/admin/home/jade_choghari/lerobot/slurm_convert_to_video.py': [Errno 2] No such file or directory
+4
View File
@@ -81,10 +81,14 @@ class AdamWConfig(OptimizerConfig):
eps: float = 1e-8
weight_decay: float = 1e-2
grad_clip_norm: float = 10.0
fused: bool = False
def build(self, params: dict) -> torch.optim.Optimizer:
kwargs = asdict(self)
kwargs.pop("grad_clip_norm")
# Fused optimizer only works on CUDA
if kwargs.get("fused") and not torch.cuda.is_available():
kwargs["fused"] = False
return torch.optim.AdamW(params, **kwargs)
@@ -136,6 +136,7 @@ class ACTConfig(PreTrainedConfig):
optimizer_lr: float = 1e-5
optimizer_weight_decay: float = 1e-4
optimizer_lr_backbone: float = 1e-5
optimizer_fused: bool = False # Use CUDA fused AdamW kernel
def __post_init__(self):
super().__post_init__()
@@ -164,6 +165,7 @@ class ACTConfig(PreTrainedConfig):
return AdamWConfig(
lr=self.optimizer_lr,
weight_decay=self.optimizer_weight_decay,
fused=self.optimizer_fused,
)
def get_scheduler_preset(self) -> None:
@@ -94,6 +94,7 @@ class GrootConfig(PreTrainedConfig):
optimizer_betas: tuple[float, float] = (0.95, 0.999)
optimizer_eps: float = 1e-8
optimizer_weight_decay: float = 1e-5
optimizer_fused: bool = False # Use CUDA fused AdamW kernel
warmup_ratio: float = 0.05
use_bf16: bool = True
@@ -174,6 +175,7 @@ class GrootConfig(PreTrainedConfig):
betas=self.optimizer_betas,
eps=self.optimizer_eps,
weight_decay=self.optimizer_weight_decay,
fused=self.optimizer_fused,
)
def get_scheduler_preset(self) -> CosineDecayWithWarmupSchedulerConfig:
@@ -23,6 +23,8 @@ from lerobot.optim.schedulers import CosineDecayWithWarmupSchedulerConfig
from lerobot.policies.rtc.configuration_rtc import RTCConfig
from lerobot.utils.constants import OBS_IMAGES
DEFAULT_IMAGE_SIZE = 224
@PreTrainedConfig.register_subclass("pi0")
@dataclass
@@ -51,7 +53,10 @@ class PI0Config(PreTrainedConfig):
# Real-Time Chunking (RTC) configuration
rtc_config: RTCConfig | None = None
image_resolution: tuple[int, int] = (224, 224) # see openpi `preprocessing_pytorch.py`
image_resolution: tuple[int, int] = (
DEFAULT_IMAGE_SIZE,
DEFAULT_IMAGE_SIZE,
) # see openpi `preprocessing_pytorch.py`
# Add empty images. Used to add empty cameras when no image features are present.
empty_cameras: int = 0
@@ -69,6 +74,7 @@ class PI0Config(PreTrainedConfig):
gradient_checkpointing: bool = False # Enable gradient checkpointing for memory optimization
compile_model: bool = False # Whether to use torch.compile for model optimization
compile_mode: str = "max-autotune" # Torch compile mode
optimizer_fused: bool = False # Use CUDA fused AdamW kernel
device: str | None = None # Device to use for the model (None = auto-detect)
# Optimizer settings: see openpi `AdamW``
@@ -136,6 +142,7 @@ class PI0Config(PreTrainedConfig):
eps=self.optimizer_eps,
weight_decay=self.optimizer_weight_decay,
grad_clip_norm=self.optimizer_grad_clip_norm,
fused=self.optimizer_fused,
)
def get_scheduler_preset(self):
+14 -13
View File
@@ -41,7 +41,7 @@ else:
PaliGemmaForConditionalGeneration = None
from lerobot.configs.policies import PreTrainedConfig
from lerobot.policies.pi0.configuration_pi0 import PI0Config
from lerobot.policies.pi0.configuration_pi0 import DEFAULT_IMAGE_SIZE, PI0Config
from lerobot.policies.pretrained import PreTrainedPolicy, T
from lerobot.policies.rtc.modeling_rtc import RTCProcessor
from lerobot.utils.constants import (
@@ -337,6 +337,7 @@ class PaliGemmaWithExpertModel(
action_expert_config,
use_adarms=None,
precision: Literal["bfloat16", "float32"] = "bfloat16",
image_size: int = DEFAULT_IMAGE_SIZE,
):
if use_adarms is None:
use_adarms = [False, False]
@@ -356,6 +357,7 @@ class PaliGemmaWithExpertModel(
vlm_config_hf.text_config.vocab_size = 257152
vlm_config_hf.text_config.use_adarms = use_adarms[0]
vlm_config_hf.text_config.adarms_cond_dim = vlm_config.width if use_adarms[0] else None
vlm_config_hf.vision_config.image_size = image_size
vlm_config_hf.vision_config.intermediate_size = 4304
vlm_config_hf.vision_config.projection_dim = 2048
vlm_config_hf.vision_config.projector_hidden_act = "gelu_fast"
@@ -519,11 +521,17 @@ class PI0Pytorch(nn.Module): # see openpi `PI0Pytorch`
paligemma_config = get_gemma_config(config.paligemma_variant)
action_expert_config = get_gemma_config(config.action_expert_variant)
if config.image_resolution[0] != config.image_resolution[1]:
raise ValueError(
f"PaliGemma expects square image resolution, invalid resolution: {config.image_resolution}"
)
self.paligemma_with_expert = PaliGemmaWithExpertModel(
paligemma_config,
action_expert_config,
use_adarms=[False, False],
precision=config.dtype,
image_size=config.image_resolution[0],
)
self.action_in_proj = nn.Linear(config.max_action_dim, action_expert_config.width)
@@ -812,16 +820,13 @@ class PI0Pytorch(nn.Module): # see openpi `PI0Pytorch`
)
dt = -1.0 / num_steps
dt = torch.tensor(dt, dtype=torch.float32, device=device)
x_t = noise
time = torch.tensor(1.0, dtype=torch.float32, device=device)
while time >= -dt / 2:
expanded_time = time.expand(bsize)
for step in range(num_steps):
time = 1.0 + step * dt
time_tensor = torch.tensor(time, dtype=torch.float32, device=device).expand(bsize)
# Define a closure function to properly capture expanded_time
# This avoids the lambda expression (E731) and loop variable binding (B023) issues
def denoise_step_partial_call(input_x_t, current_timestep=expanded_time):
def denoise_step_partial_call(input_x_t, current_timestep=time_tensor):
return self.denoise_step(
state=state,
prefix_pad_masks=prefix_pad_masks,
@@ -846,15 +851,11 @@ class PI0Pytorch(nn.Module): # see openpi `PI0Pytorch`
else:
v_t = denoise_step_partial_call(x_t)
# Euler step
x_t += dt * v_t
x_t = x_t + dt * v_t
# Record x_t and v_t after Euler step
if self.rtc_processor is not None and self.rtc_processor.is_debug_enabled():
self.rtc_processor.track(time=time, x_t=x_t, v_t=v_t)
time += dt
return x_t
def denoise_step(
@@ -22,6 +22,8 @@ from lerobot.optim.optimizers import AdamWConfig
from lerobot.optim.schedulers import CosineDecayWithWarmupSchedulerConfig
from lerobot.policies.rtc.configuration_rtc import RTCConfig
DEFAULT_IMAGE_SIZE = 224
@PreTrainedConfig.register_subclass("pi05")
@dataclass
@@ -50,7 +52,10 @@ class PI05Config(PreTrainedConfig):
# Real-Time Chunking (RTC) configuration
rtc_config: RTCConfig | None = None
image_resolution: tuple[int, int] = (224, 224) # see openpi `preprocessing_pytorch.py`
image_resolution: tuple[int, int] = (
DEFAULT_IMAGE_SIZE,
DEFAULT_IMAGE_SIZE,
) # see openpi `preprocessing_pytorch.py`
# Add empty images. Used to add empty cameras when no image features are present.
empty_cameras: int = 0
@@ -69,6 +74,7 @@ class PI05Config(PreTrainedConfig):
gradient_checkpointing: bool = False # Enable gradient checkpointing for memory optimization
compile_model: bool = False # Whether to use torch.compile for model optimization
compile_mode: str = "max-autotune" # Torch compile mode
optimizer_fused: bool = False # Use CUDA fused AdamW kernel
device: str | None = None # Device to use for the model (None = auto-detect)
# Optimizer settings: see openpi `AdamW`
@@ -136,6 +142,7 @@ class PI05Config(PreTrainedConfig):
eps=self.optimizer_eps,
weight_decay=self.optimizer_weight_decay,
grad_clip_norm=self.optimizer_grad_clip_norm,
fused=self.optimizer_fused,
)
def get_scheduler_preset(self):
+14 -13
View File
@@ -41,7 +41,7 @@ else:
PaliGemmaForConditionalGeneration = None
from lerobot.configs.policies import PreTrainedConfig
from lerobot.policies.pi05.configuration_pi05 import PI05Config
from lerobot.policies.pi05.configuration_pi05 import DEFAULT_IMAGE_SIZE, PI05Config
from lerobot.policies.pretrained import PreTrainedPolicy, T
from lerobot.policies.rtc.modeling_rtc import RTCProcessor
from lerobot.utils.constants import (
@@ -336,6 +336,7 @@ class PaliGemmaWithExpertModel(
action_expert_config,
use_adarms=None,
precision: Literal["bfloat16", "float32"] = "bfloat16",
image_size: int = DEFAULT_IMAGE_SIZE,
):
if use_adarms is None:
use_adarms = [False, False]
@@ -355,6 +356,7 @@ class PaliGemmaWithExpertModel(
vlm_config_hf.text_config.vocab_size = 257152
vlm_config_hf.text_config.use_adarms = use_adarms[0]
vlm_config_hf.text_config.adarms_cond_dim = vlm_config.width if use_adarms[0] else None
vlm_config_hf.vision_config.image_size = image_size
vlm_config_hf.vision_config.intermediate_size = 4304
vlm_config_hf.vision_config.projection_dim = 2048
vlm_config_hf.vision_config.projector_hidden_act = "gelu_fast"
@@ -518,11 +520,17 @@ class PI05Pytorch(nn.Module): # see openpi `PI0Pytorch`
paligemma_config = get_gemma_config(config.paligemma_variant)
action_expert_config = get_gemma_config(config.action_expert_variant)
if config.image_resolution[0] != config.image_resolution[1]:
raise ValueError(
f"PaliGemma expects square image resolution, invalid resolution: {config.image_resolution}"
)
self.paligemma_with_expert = PaliGemmaWithExpertModel(
paligemma_config,
action_expert_config,
use_adarms=[False, True],
precision=config.dtype,
image_size=config.image_resolution[0],
)
self.action_in_proj = nn.Linear(config.max_action_dim, action_expert_config.width)
@@ -787,16 +795,13 @@ class PI05Pytorch(nn.Module): # see openpi `PI0Pytorch`
)
dt = -1.0 / num_steps
dt = torch.tensor(dt, dtype=torch.float32, device=device)
x_t = noise
time = torch.tensor(1.0, dtype=torch.float32, device=device)
while time >= -dt / 2:
expanded_time = time.expand(bsize)
for step in range(num_steps):
time = 1.0 + step * dt
time_tensor = torch.tensor(time, dtype=torch.float32, device=device).expand(bsize)
# Define a closure function to properly capture expanded_time
# This avoids the lambda expression (E731) and loop variable binding (B023) issues
def denoise_step_partial_call(input_x_t, current_timestep=expanded_time):
def denoise_step_partial_call(input_x_t, current_timestep=time_tensor):
return self.denoise_step(
prefix_pad_masks=prefix_pad_masks,
past_key_values=past_key_values,
@@ -820,15 +825,11 @@ class PI05Pytorch(nn.Module): # see openpi `PI0Pytorch`
else:
v_t = denoise_step_partial_call(x_t)
# Euler step
x_t += dt * v_t
x_t = x_t + dt * v_t
# Record x_t and v_t after Euler step
if self.rtc_processor is not None and self.rtc_processor.is_debug_enabled():
self.rtc_processor.track(time=time, x_t=x_t, v_t=v_t)
time += dt
return x_t
def denoise_step(
@@ -79,6 +79,7 @@ class SmolVLAConfig(PreTrainedConfig):
optimizer_eps: float = 1e-8
optimizer_weight_decay: float = 1e-10
optimizer_grad_clip_norm: float = 10
optimizer_fused: bool = False
scheduler_warmup_steps: int = 1_000
scheduler_decay_steps: int = 30_000
@@ -136,6 +137,7 @@ class SmolVLAConfig(PreTrainedConfig):
eps=self.optimizer_eps,
weight_decay=self.optimizer_weight_decay,
grad_clip_norm=self.optimizer_grad_clip_norm,
fused=self.optimizer_fused,
)
def get_scheduler_preset(self):
@@ -783,18 +783,15 @@ class VLAFlowMatching(nn.Module):
use_cache=self.config.use_cache,
fill_kv_cache=True,
)
dt = -1.0 / self.config.num_steps
dt = torch.tensor(dt, dtype=torch.float32, device=device)
num_steps = self.config.num_steps
dt = -1.0 / num_steps
x_t = noise
time = torch.tensor(1.0, dtype=torch.float32, device=device)
for step in range(num_steps):
time = 1.0 + step * dt
time_tensor = torch.tensor(time, dtype=torch.float32, device=device).expand(bsize)
while time >= -dt / 2:
expanded_time = time.expand(bsize)
# Define a closure function to properly capture expanded_time
# This avoids the lambda expression (E731) and loop variable binding (B023) issues
def denoise_step_partial_call(input_x_t, current_timestep=expanded_time):
def denoise_step_partial_call(input_x_t, current_timestep=time_tensor):
return self.denoise_step(
x_t=input_x_t,
prefix_pad_masks=prefix_pad_masks,
@@ -818,15 +815,11 @@ class VLAFlowMatching(nn.Module):
else:
v_t = denoise_step_partial_call(x_t)
# Euler step
x_t += dt * v_t
x_t = x_t + dt * v_t
# Record x_t and v_t after Euler step (other params are recorded in rtc_processor.denoise_step)
if self.rtc_processor is not None and self.rtc_processor.is_debug_enabled():
self.rtc_processor.track(time=time, x_t=x_t, v_t=v_t)
time += dt
return x_t
def denoise_step(