Compare commits

..

6 Commits

Author SHA1 Message Date
Jade Choghari 6216932fb0 more changres 2025-12-09 08:57:49 +00:00
Jade Choghari 5fab1ed5cd Merge branch 'main' into feat/data-images2video 2025-12-06 10:38:00 +01:00
Jade Choghari 8b7c46c5f7 more fixes 2025-12-05 20:37:27 +00:00
Jade Choghari ba97f64afd make it work 2025-12-01 14:45:23 +01:00
Jade Choghari 8d861fe94b style 2025-12-01 13:47:15 +01:00
Jade Choghari d22fa6446b add video encoding tool 2025-12-01 13:46:22 +01:00
22 changed files with 1101 additions and 112 deletions
@@ -31,8 +31,7 @@ jobs:
name: Upload Preview and Comment
if: >
github.event.workflow_run.event == 'pull_request' &&
github.event.workflow_run.conclusion == 'success' &&
github.repository == 'huggingface/lerobot'
github.event.workflow_run.conclusion == 'success'
uses: huggingface/doc-builder/.github/workflows/upload_pr_documentation.yml@main
with:
package_name: lerobot
+2 -4
View File
@@ -42,9 +42,7 @@ jobs:
# This job builds and deploys the official documentation.
build_main_docs:
name: Build Main Docs
if: >
(github.event_name == 'push' || github.event_name == 'workflow_dispatch') &&
github.repository == 'huggingface/lerobot'
if: github.event_name == 'push' || github.event_name == 'workflow_dispatch'
permissions:
contents: read
uses: huggingface/doc-builder/.github/workflows/build_main_documentation.yml@main
@@ -60,7 +58,7 @@ jobs:
# The result of this job triggers the 'Upload PR Documentation' workflow.
build_pr_docs:
name: Build PR Docs
if: github.event_name == 'pull_request' && github.repository == 'huggingface/lerobot'
if: github.event_name == 'pull_request'
permissions:
contents: read
pull-requests: write
+1
View File
@@ -45,6 +45,7 @@ permissions:
env:
UV_VERSION: "0.8.0"
PYTHON_VERSION: "3.10"
DOCKER_IMAGE_NAME: huggingface/lerobot-gpu
# Ensures that only the latest commit for a PR or branch is built, canceling older runs.
concurrency:
-2
View File
@@ -43,7 +43,6 @@ jobs:
name: Build CPU Docker for Nightly
runs-on:
group: aws-general-8-plus
if: github.repository == 'huggingface/lerobot'
outputs:
image_tag: ${{ env.DOCKER_IMAGE_NAME_CPU }}
steps:
@@ -78,7 +77,6 @@ jobs:
name: Build GPU Docker for Nightly
runs-on:
group: aws-general-8-plus
if: github.repository == 'huggingface/lerobot'
outputs:
image_tag: ${{ env.DOCKER_IMAGE_NAME_GPU }}
steps:
-1
View File
@@ -29,7 +29,6 @@ jobs:
build-and-publish:
name: Build and publish Python distributions
runs-on: ubuntu-latest
if: github.repository == 'huggingface/lerobot'
outputs:
version: ${{ steps.extract_info.outputs.tag_version }}
permissions:
-1
View File
@@ -45,7 +45,6 @@ jobs:
stale:
name: Close Stale Issues and PRs
runs-on: ubuntu-latest
if: github.repository == 'huggingface/lerobot'
permissions:
actions: write
contents: write # only for delete-branch option
-1
View File
@@ -43,7 +43,6 @@ jobs:
full-tests:
name: Full Unbound Tests
runs-on: ubuntu-latest
if: github.repository == 'huggingface/lerobot'
env:
MUJOCO_GL: egl
HF_HOME: /mnt/cache/.cache/huggingface
+82 -40
View File
@@ -24,7 +24,7 @@ Built from pure Transformer encoders, X-VLA scales naturally with model size and
<img
src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/lerobot/xvla-architecture2.png"
alt="XVLA Architecture 2"
style="width: 60%; height: auto;"
style="width: 32%; max-width: 450px; height: auto;"
/>
</p>
@@ -120,7 +120,7 @@ Adapted for Google Robot platforms.
### Recommended Training Configuration
When fine-tuning X-VLA for a new embodiment or task, we recommend not freezing the VLM, and also setting the `policy.dtype=bfloat16` to not hit OOM errors.
When fine-tuning X-VLA for a new embodiment or task, we recommend the following freezing strategy:
```bash
lerobot-train \
@@ -129,26 +129,25 @@ lerobot-train \
--job_name=xvla_training \
--policy.path="lerobot/xvla-base" \
--policy.repo_id="HF_USER/xvla-your-robot" \
--policy.dtype=bfloat16 \
--steps=3000 \
--policy.device=cuda \
--policy.freeze_vision_encoder=false \
--policy.freeze_language_encoder=false \
--policy.train_policy_transformer=true \
--policy.train_soft_prompts=true \
--policy.freeze_vision_encoder=True \
--policy.freeze_language_encoder=True \
--policy.train_policy_transformer=True \
--policy.train_soft_prompts=True \
--policy.action_mode=YOUR_ACTION_MODE
```
### Training Parameters Explained
| Parameter | Default | Description |
| -------------------------- | ------- | ---------------------------------------------- |
| `freeze_vision_encoder` | `false` | Do not freeze the VLM vision encoder weights |
| `freeze_language_encoder` | `false` | Do not freeze the VLM language encoder weights |
| `train_policy_transformer` | `true` | Allow policy transformer layers to train |
| `train_soft_prompts` | `true` | Allow soft prompts to train |
| Parameter | Default | Description |
| -------------------------- | ------- | ---------------------------------------- |
| `freeze_vision_encoder` | `True` | Freeze the VLM vision encoder weights |
| `freeze_language_encoder` | `True` | Freeze the VLM language encoder weights |
| `train_policy_transformer` | `True` | Allow policy transformer layers to train |
| `train_soft_prompts` | `True` | Allow soft prompts to train |
**💡 Best Practice**: For Phase II adaptation to new embodiments, do not freeze the VLM encoders and also train the policy transformer and soft prompts.
**💡 Best Practice**: For Phase II adaptation to new embodiments, freeze the VLM encoders and only train the policy transformer and soft prompts. This provides excellent sample efficiency with minimal compute.
### Example: Training on Bimanual Robot
@@ -158,15 +157,14 @@ lerobot-train \
--output_dir=./outputs/xvla_bimanual \
--job_name=xvla_so101_training \
--policy.path="lerobot/xvla-base" \
--policy.dtype=bfloat16 \
--policy.repo_id="YOUR_USERNAME/xvla-biso101" \
--steps=3000 \
--policy.device=cuda \
--policy.action_mode=so101_bimanual \
--policy.freeze_vision_encoder=false \
--policy.freeze_language_encoder=false \
--policy.train_policy_transformer=true \
--policy.train_soft_prompts=true
--policy.freeze_vision_encoder=True \
--policy.freeze_language_encoder=True \
--policy.train_policy_transformer=True \
--policy.train_soft_prompts=True
```
💡 **Best Performance:** If you have sufficient computational resources and want to achieve best X-VLA finetuning performance, you should follow the official finetuning strategy:
@@ -174,7 +172,71 @@ lerobot-train \
**🔥 Full-finetune all components with a custom learning-rate scheme**
To ensure stable optimization, the Vision-Language Model (VLM) must be trained with only 1/10 of the base learning rate, while all other components use the full LR.
This LR ratio is crucial for achieving strong and stable finetuning performance. This is already done for you by default.
This LR ratio is crucial for achieving strong and stable finetuning performance.
To enable this behavior, you must:
1. Implement a custom optimizer and register it in your training config
```
from dataclasses import dataclass, asdict
from lerobot.optim.optimizers import OptimizerConfig
import torch
@OptimizerConfig.register_subclass("xvla-adamw")
@dataclass
class XVLAAdamW(OptimizerConfig):
lr: float = 1e-4
betas: tuple[float, float] = (0.9, 0.99)
eps: float = 1e-8
weight_decay: float = 0.0
grad_clip_norm: float = 10.0
def build(self, params: dict) -> torch.optim.Optimizer:
"""
Expect `named_parameters()` as input.
Apply lr = lr / 10 for all VLM-related parameters.
"""
assert isinstance(params, dict), \
"Custom LR optimizer requires `named_parameters()` as inputs."
kwargs = asdict(self)
kwargs.pop("grad_clip_norm")
vlm_group, other_group = [], []
for name, p in params.items():
if not p.requires_grad:
continue
if "vlm" in name.lower():
vlm_group.append(p)
else:
other_group.append(p)
param_groups = [
{"params": vlm_group, "lr": self.lr * 0.1, "weight_decay": self.weight_decay * 0.1},
{"params": other_group, "lr": self.lr, "weight_decay": self.weight_decay},
]
return torch.optim.AdamW(param_groups, **kwargs)
```
2. Modify X-VLAs get_optim_params to return named parameters
Replace:
```
def get_optim_params(self) -> dict:
"""Return only trainable parameters for optimization."""
return filter(lambda p: p.requires_grad, self.parameters())
```
with:
```
def get_optim_params(self):
"""Return trainable named parameters."""
return filter(lambda kv: kv[1].requires_grad, self.named_parameters())
```
This ensures the optimizer receives a dict of named parameters, allowing it to correctly detect VLM modules and apply the 1/10 LR rule.
❕Note
Completely matching the official reported performance may require an additional warm-up LR schedule for soft-prompts, which can bring minor improvements.
@@ -264,26 +326,6 @@ domain_id = 3
The domain_id is automatically added to observations by the `XVLAAddDomainIdProcessorStep` in the preprocessing pipeline.
The `lerobot/xvla-base` model has been trained on the following domain IDs. It is recommended to choose one that most resembles your robot/configuration:
#### Fine-tuning Datasets
| Dataset Name | Domain ID |
| ---------------- | --------- |
| Bridge | 0 |
| RT1 | 1 |
| Calvin | 2 |
| libero | 3 |
| widowx-air | 4 |
| AIR-AGILEX-HQ | 5 |
| robotwin2_abs_ee | 6 |
| robotwin2_clean | 6 |
| robocasa-human | 7 |
| VLABench | 8 |
| AGIBOT-challenge | 9 |
| AIR-AGILEX | 10 |
| AIRBOT | 18 |
### 3. Processor Steps
X-VLA requires specific preprocessing and postprocessing steps for proper operation.
+20
View File
@@ -0,0 +1,20 @@
Goal:
Create a high and low policy, the high policy take obs + text and return text, its a VLM
The low policy is a VLA that take text returned by the high and ouptut actions
High policy is ran every one second or when user send prompt
Synthetic data generation:
D demo -> teleop data with a global task annotation
D label -> segment data into short skills (one to three seconds)
D syn: p-gen will create the high level prompt user might gave to p-hi
Given D label prompt p-gen to imagine appropriate action, take images, ALL PRIOR skill labels in the episode: ℓ̂₀, …, ℓ̂ₜ₋₁
“Given the scene + all previous steps + current needed skill ℓ̂₅,
generate a user request that logically leads to ℓ̂₅.”
Train:
Phi(lt| images, global label) cross entropy - next token predictions
Plow(At| images, qt, lt)
@@ -0,0 +1,141 @@
#!/bin/bash
#SBATCH --time=24:00:00
#SBATCH --partition=hopper-cpu
#SBATCH --cpus-per-task=96
#SBATCH --output=/fsx/jade_choghari/logs/launcher_%j.out
#SBATCH --error=/fsx/jade_choghari/logs/launcher_%j.err
# Activate conda environment
# Load conda
source /fsx/jade_choghari/miniforge3/etc/profile.d/conda.sh
conda activate lerobot
set -e # Exit on error
# Input dataset
REPO_ID="lerobot"
ROOT="/fsx/jade_choghari/vlabench-primitive/"
# Output paths
OUTPUT_DIR="/fsx/jade_choghari/vlabench-primitive-encoded"
OUTPUT_REPO_ID="vlabench-primitive-encoded"
LOGS_DIR="/fsx/jade_choghari/logs/convert_video"
# Video encoding settings
VCODEC="libsvtav1"
PIX_FMT="yuv420p"
GOP_SIZE=2
CRF=30
FAST_DECODE=0
# Parallelization settings
NUM_WORKERS=24 # Number of parallel SLURM workers
NUM_IMAGE_WORKERS=4 # Threads per worker for image saving
# SLURM settings
PARTITION="hopper-cpu" # Change to your CPU partition name
CPUS_PER_TASK=32 # CPUs per worker
MEM_PER_CPU="4G" # Memory per CPU
TIME_LIMIT="24:00:00" # Time limit per job
###############################################################################
# STEP 1: Parallel Video Conversion
###############################################################################
rm -rf "${OUTPUT_DIR}"
mkdir -p "${OUTPUT_DIR}"
echo "=============================================="
echo "STEP 1: Starting parallel video conversion"
echo " Workers: ${NUM_WORKERS}"
echo " Input: ${REPO_ID} (root: ${ROOT})"
echo " Output: ${OUTPUT_DIR}"
echo "=============================================="
python /admin/home/jade_choghari/lerobot/examples/port_datasets/slurm_convert_to_video.py\
--repo-id "${REPO_ID}" \
--root "${ROOT}" \
--output-dir "${OUTPUT_DIR}" \
--output-repo-id "${OUTPUT_REPO_ID}" \
--vcodec "${VCODEC}" \
--pix-fmt "${PIX_FMT}" \
--g ${GOP_SIZE} \
--crf ${CRF} \
--fast-decode ${FAST_DECODE} \
--num-image-workers ${NUM_IMAGE_WORKERS} \
--logs-dir "${LOGS_DIR}" \
--job-name "convert_video" \
--slurm 1 \
--workers ${NUM_WORKERS} \
--partition "${PARTITION}" \
--cpus-per-task ${CPUS_PER_TASK} \
--mem-per-cpu "${MEM_PER_CPU}" \
--time-limit "${TIME_LIMIT}"
echo ""
echo "✓ Parallel conversion jobs submitted!"
echo " Monitor with: squeue -u \$USER"
echo " Check logs in: ${LOGS_DIR}/convert_video"
echo ""
echo "Wait for all jobs to complete before running Step 2."
echo "You can check completion with: squeue -u \$USER | grep convert_video"
echo ""
echo "After all jobs complete, run Step 2 to aggregate shards:"
echo " bash convert_to_video_parallel.sh aggregate"
###############################################################################
# STEP 2: Aggregate Shards (run this after Step 1 completes)
###############################################################################
if [ "$1" == "aggregate" ]; then
echo ""
echo "=============================================="
echo "STEP 2: Aggregating video shards"
echo " Shards: ${NUM_WORKERS}"
echo " Input: ${OUTPUT_DIR}/shard_XXXX"
echo " Output: ${OUTPUT_DIR}_final"
echo "=============================================="
python slurm_aggregate_video_shards.py \
--shards-dir "${OUTPUT_DIR}" \
--output-dir "${OUTPUT_DIR}_final" \
--output-repo-id "${OUTPUT_REPO_ID}" \
--num-shards ${NUM_WORKERS} \
--logs-dir "${LOGS_DIR}" \
--job-name "aggregate_video" \
--slurm 1 \
--partition "${PARTITION}" \
--cpus-per-task 16 \
--mem-per-cpu "8G" \
--time-limit "08:00:00"
echo ""
echo "✓ Aggregation job submitted!"
echo " Monitor with: squeue -u \$USER | grep aggregate_video"
echo " Check logs in: ${LOGS_DIR}/aggregate_video"
echo ""
echo "After completion, your final dataset will be in:"
echo " ${OUTPUT_DIR}_final"
fi
###############################################################################
# Helpful information
###############################################################################
if [ "$1" != "aggregate" ]; then
echo ""
echo "=============================================="
echo "WORKFLOW SUMMARY"
echo "=============================================="
echo ""
echo "1. Step 1 is now running - it will:"
echo " - Split episodes across ${NUM_WORKERS} workers"
echo " - Each worker converts its episodes to video"
echo " - Creates shard datasets in ${OUTPUT_DIR}/shard_XXXX"
echo ""
echo "2. After Step 1 completes, run Step 2:"
echo " bash convert_to_video_parallel.sh aggregate"
echo ""
echo "3. Step 2 will merge all shards into a single dataset"
echo ""
echo "=============================================="
fi
@@ -0,0 +1,266 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Aggregate video dataset shards into a single dataset.
After parallel conversion using slurm_convert_to_video.py, this script merges
all the shard datasets into one final dataset.
Example usage:
python slurm_aggregate_video_shards.py \
--shards-dir /fsx/jade_choghari/libero_video \
--output-dir /fsx/jade_choghari/libero_video_final \
--output-repo-id lerobot_video \
--num-workers 100 \
--partition cpu_partition \
--cpus-per-task 16
"""
import argparse
import logging
from pathlib import Path
from datatrove.executor import LocalPipelineExecutor
from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.base import PipelineStep
class AggregateVideoShards(PipelineStep):
"""Pipeline step that aggregates video dataset shards."""
def __init__(
self,
shards_dir: str | Path,
output_dir: str | Path,
output_repo_id: str,
num_shards: int,
):
super().__init__()
self.shards_dir = Path(shards_dir)
self.output_dir = Path(output_dir)
self.output_repo_id = output_repo_id
self.num_shards = num_shards
def run(self, data=None, rank: int = 0, world_size: int = 1):
"""Aggregate all shards into a single dataset."""
from lerobot.datasets.dataset_tools import merge_datasets
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.utils.utils import init_logging
init_logging()
# Only worker 0 performs aggregation
if rank != 0:
logging.info(f"Worker {rank} skipping - only worker 0 performs aggregation")
return
logging.info(f"Starting aggregation of {self.num_shards} shards")
# Collect all shard datasets
shard_datasets = []
for shard_idx in range(self.num_shards):
shard_dir = self.shards_dir / f"shard_{shard_idx:04d}"
if not shard_dir.exists():
logging.warning(f"Shard directory not found: {shard_dir}")
continue
# Find the repo_id for this shard
shard_repo_id = f"{self.output_repo_id}_shard_{shard_idx:04d}"
try:
shard_dataset = LeRobotDataset(shard_repo_id, root=shard_dir)
shard_datasets.append(shard_dataset)
logging.info(
f"Loaded shard {shard_idx}: {shard_dataset.meta.total_episodes} episodes, "
f"{shard_dataset.meta.total_frames} frames"
)
except Exception as e:
logging.error(f"Failed to load shard {shard_idx}: {e}")
continue
if len(shard_datasets) == 0:
raise ValueError(f"No valid shards found in {self.shards_dir}")
logging.info(f"Successfully loaded {len(shard_datasets)} shards, starting merge")
# Merge all shards
self.output_dir.mkdir(parents=True, exist_ok=True)
merged_dataset = merge_datasets(
shard_datasets,
output_repo_id=self.output_repo_id,
output_dir=self.output_dir,
)
logging.info("✓ Aggregation complete!")
logging.info(f"Merged dataset saved to: {self.output_dir}")
logging.info(f"Total episodes: {merged_dataset.meta.total_episodes}")
logging.info(f"Total frames: {merged_dataset.meta.total_frames}")
def make_aggregate_executor(
shards_dir,
output_dir,
output_repo_id,
num_shards,
job_name,
logs_dir,
partition,
cpus_per_task,
mem_per_cpu,
time_limit,
slurm=True,
):
"""Create executor for shard aggregation."""
kwargs = {
"pipeline": [
AggregateVideoShards(
shards_dir=shards_dir,
output_dir=output_dir,
output_repo_id=output_repo_id,
num_shards=num_shards,
),
],
"logging_dir": str(logs_dir / job_name),
}
if slurm:
# Only need 1 worker for aggregation
kwargs.update(
{
"job_name": job_name,
"tasks": 1,
"workers": 1,
"time": time_limit,
"partition": partition,
"cpus_per_task": cpus_per_task,
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
}
)
executor = SlurmPipelineExecutor(**kwargs)
else:
kwargs.update(
{
"tasks": 1,
"workers": 1,
}
)
executor = LocalPipelineExecutor(**kwargs)
return executor
def main():
parser = argparse.ArgumentParser(
description="Aggregate video dataset shards into a single dataset",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--shards-dir",
type=Path,
required=True,
help="Directory containing shard_XXXX subdirectories",
)
parser.add_argument(
"--output-dir",
type=Path,
required=True,
help="Output directory for the aggregated dataset",
)
parser.add_argument(
"--output-repo-id",
type=str,
required=True,
help="Repository ID for the aggregated dataset",
)
parser.add_argument(
"--num-shards",
type=int,
required=True,
help="Number of shards to aggregate (should match --workers from conversion)",
)
parser.add_argument(
"--logs-dir",
type=Path,
required=True,
help="Path to logs directory for datatrove",
)
parser.add_argument(
"--job-name",
type=str,
default="aggregate_video_shards",
help="Job name for SLURM",
)
parser.add_argument(
"--slurm",
type=int,
default=1,
help="Launch over SLURM (1) or locally (0)",
)
parser.add_argument(
"--partition",
type=str,
required=True,
help="SLURM partition (use CPU partition)",
)
parser.add_argument(
"--cpus-per-task",
type=int,
default=16,
help="Number of CPUs per task (aggregation can use more CPUs)",
)
parser.add_argument(
"--mem-per-cpu",
type=str,
default="8G",
help="Memory per CPU",
)
parser.add_argument(
"--time-limit",
type=str,
default="08:00:00",
help="Time limit for SLURM job",
)
args = parser.parse_args()
# Convert slurm flag to boolean
slurm = args.slurm == 1
# Create and run executor
executor = make_aggregate_executor(
shards_dir=args.shards_dir,
output_dir=args.output_dir,
output_repo_id=args.output_repo_id,
num_shards=args.num_shards,
job_name=args.job_name,
logs_dir=args.logs_dir,
partition=args.partition,
cpus_per_task=args.cpus_per_task,
mem_per_cpu=args.mem_per_cpu,
time_limit=args.time_limit,
slurm=slurm,
)
logging.info("Starting shard aggregation")
executor.run()
logging.info("Aggregation job submitted/completed")
if __name__ == "__main__":
main()
@@ -0,0 +1,539 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Parallelize video conversion using SLURM and Datatrove.
This script converts an image-based LeRobot dataset to video format by distributing
episodes across multiple workers for parallel processing.
Example usage:
python slurm_convert_to_video.py \
--repo-id lerobot \
--root /fsx/jade_choghari/libero/ \
--output-dir /fsx/jade_choghari/libero_video \
--output-repo-id lerobot_video \
--workers 100 \
--partition cpu_partition \
--cpus-per-task 8 \
--logs-dir ./logs
"""
import argparse
import logging
from pathlib import Path
from datatrove.executor import LocalPipelineExecutor
from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.base import PipelineStep
class ConvertEpisodesToVideo(PipelineStep):
"""Pipeline step that converts episodes to videos in parallel."""
def __init__(
self,
repo_id: str,
root: str | None,
output_dir: str,
output_repo_id: str | None,
vcodec: str = "libsvtav1",
pix_fmt: str = "yuv420p",
g: int = 2,
crf: int = 30,
fast_decode: int = 0,
num_image_workers: int = 4,
):
super().__init__()
self.repo_id = repo_id
self.root = root
self.output_dir = Path(output_dir)
self.output_repo_id = output_repo_id or f"{repo_id}_video"
self.vcodec = vcodec
self.pix_fmt = pix_fmt
self.g = g
self.crf = crf
self.fast_decode = fast_decode
self.num_image_workers = num_image_workers
def run(self, data=None, rank: int = 0, world_size: int = 1):
"""Process a shard of episodes."""
from datasets.utils.tqdm import disable_progress_bars
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.scripts.lerobot_edit_dataset import (
encode_episode_videos,
)
from lerobot.utils.utils import init_logging
import logging
init_logging()
disable_progress_bars()
logging.info(f"Worker {rank}/{world_size} starting video conversion")
# Load source dataset
dataset = LeRobotDataset(self.repo_id, root=self.root)
total_episodes = dataset.meta.total_episodes
# Determine which episodes this worker processes
episodes_per_worker = total_episodes // world_size
remainder = total_episodes % world_size
if rank < remainder:
# First 'remainder' workers get one extra episode
start_ep = rank * (episodes_per_worker + 1)
end_ep = start_ep + episodes_per_worker + 1
else:
start_ep = remainder * (episodes_per_worker + 1) + (rank - remainder) * episodes_per_worker
end_ep = start_ep + episodes_per_worker
episode_indices = list(range(start_ep, end_ep))
logging.info(
f"Worker {rank} processing episodes {start_ep} to {end_ep-1} ({len(episode_indices)} episodes)"
)
if len(episode_indices) == 0:
logging.info(f"Worker {rank} has no episodes to process")
return
# Create shard-specific output directory
import shutil
shard_output_dir = self.output_dir / f"shard_{rank:04d}"
# Remove existing directory to avoid conflicts with LeRobotDatasetMetadata.create
if shard_output_dir.exists():
logging.warning(f"Shard directory {shard_output_dir} already exists, removing it")
shutil.rmtree(shard_output_dir)
# Import conversion function
from lerobot.scripts.lerobot_edit_dataset import convert_dataset_to_videos
logging.info(
f"Worker {rank} converting {len(episode_indices)} episodes with codec {self.vcodec}, CRF {self.crf}"
)
# Convert this shard's episodes with remapped indices
# We need to remap episode indices to start from 0 for proper file structure
self._convert_shard_to_videos(
dataset=dataset,
shard_output_dir=shard_output_dir,
shard_repo_id=f"{self.output_repo_id}_shard_{rank:04d}",
episode_indices=episode_indices,
)
logging.info(f"Worker {rank} completed successfully")
def _convert_shard_to_videos(
self,
dataset,
shard_output_dir,
shard_repo_id,
episode_indices,
):
"""Convert a shard's episodes to videos with proper index remapping."""
import shutil
import pandas as pd
from tqdm import tqdm
from lerobot.datasets.lerobot_dataset import LeRobotDatasetMetadata
from lerobot.datasets.utils import write_info, write_stats, write_tasks
from lerobot.datasets.video_utils import encode_video_frames, get_video_info
from lerobot.scripts.lerobot_edit_dataset import (
save_episode_images_for_video,
_copy_data_without_images,
)
from lerobot.utils.constants import OBS_IMAGE
# Check that it's an image dataset
if len(dataset.meta.video_keys) > 0:
raise ValueError(
f"This operation is for image datasets only. Video dataset provided: {dataset.repo_id}"
)
# Get all image keys
hf_dataset = dataset.hf_dataset.with_format(None)
img_keys = [key for key in hf_dataset.features if key.startswith(OBS_IMAGE)]
if len(img_keys) == 0:
raise ValueError(f"No image keys found in dataset {dataset.repo_id}")
logging.info(f"Converting {len(episode_indices)} episodes with {len(img_keys)} cameras")
# Create new features dict, converting image features to video features
new_features = {}
for key, value in dataset.meta.features.items():
if key not in img_keys:
new_features[key] = value
else:
# Convert image key to video format
new_features[key] = value.copy()
new_features[key]["dtype"] = "video"
# Create new metadata for video dataset
new_meta = LeRobotDatasetMetadata.create(
repo_id=shard_repo_id,
fps=dataset.meta.fps,
features=new_features,
robot_type=dataset.meta.robot_type,
root=shard_output_dir,
use_videos=True,
chunks_size=dataset.meta.chunks_size,
data_files_size_in_mb=dataset.meta.data_files_size_in_mb,
video_files_size_in_mb=dataset.meta.video_files_size_in_mb,
)
# Create temporary directory for image extraction
temp_dir = shard_output_dir / "temp_images"
temp_dir.mkdir(parents=True, exist_ok=True)
# Process each episode with REMAPPED indices (0, 1, 2, ...)
all_episode_metadata = []
fps = int(dataset.fps)
try:
for new_ep_idx, orig_ep_idx in enumerate(tqdm(episode_indices, desc="Converting episodes")):
# Get episode metadata from source using ORIGINAL index
src_episode = dataset.meta.episodes[orig_ep_idx]
episode_length = src_episode["length"]
episode_duration = episode_length / dataset.fps
video_metadata = {}
# Encode videos for each camera
for img_key in img_keys:
# Save images temporarily using ORIGINAL episode index
imgs_dir = temp_dir / f"episode_{orig_ep_idx:06d}" / img_key
save_episode_images_for_video(
dataset, imgs_dir, img_key, orig_ep_idx, self.num_image_workers
)
# Determine chunk and file indices using NEW (remapped) episode index
chunk_idx = new_ep_idx // new_meta.chunks_size
file_idx = new_ep_idx % new_meta.chunks_size
# Create video path in the new dataset structure
video_path = new_meta.root / new_meta.video_path.format(
video_key=img_key, chunk_index=chunk_idx, file_index=file_idx
)
video_path.parent.mkdir(parents=True, exist_ok=True)
# Encode video
encode_video_frames(
imgs_dir=imgs_dir,
video_path=video_path,
fps=fps,
vcodec=self.vcodec,
pix_fmt=self.pix_fmt,
g=self.g,
crf=self.crf,
fast_decode=self.fast_decode,
overwrite=True,
)
# Clean up temporary images
shutil.rmtree(imgs_dir)
# Store video metadata
video_metadata[img_key] = {
f"videos/{img_key}/chunk_index": chunk_idx,
f"videos/{img_key}/file_index": file_idx,
f"videos/{img_key}/from_timestamp": 0.0,
f"videos/{img_key}/to_timestamp": episode_duration,
}
# Build episode metadata using NEW index
episode_meta = {
"episode_index": new_ep_idx,
"length": episode_length,
"dataset_from_index": new_ep_idx * episode_length,
"dataset_to_index": (new_ep_idx + 1) * episode_length,
}
# Add video metadata
for img_key in img_keys:
episode_meta.update(video_metadata[img_key])
# Add data chunk/file info
if "data/chunk_index" in src_episode:
episode_meta["data/chunk_index"] = src_episode["data/chunk_index"]
episode_meta["data/file_index"] = src_episode["data/file_index"]
all_episode_metadata.append(episode_meta)
# Copy and transform data files (removing image columns)
_copy_data_without_images(dataset, new_meta, episode_indices, img_keys)
# Save episode metadata
episodes_df = pd.DataFrame(all_episode_metadata)
episodes_path = new_meta.root / "meta" / "episodes" / "chunk-000" / "file-000.parquet"
episodes_path.parent.mkdir(parents=True, exist_ok=True)
episodes_df.to_parquet(episodes_path, index=False)
# Update metadata info
new_meta.info["total_episodes"] = len(episode_indices)
new_meta.info["total_frames"] = sum(ep["length"] for ep in all_episode_metadata)
new_meta.info["total_tasks"] = dataset.meta.total_tasks
new_meta.info["splits"] = {"train": f"0:{len(episode_indices)}"}
# Update video info for all image keys using the actual first video file
for img_key in img_keys:
if not new_meta.features[img_key].get("info", None):
# Use the first actually created video file
chunk_idx = all_episode_metadata[0][f"videos/{img_key}/chunk_index"]
file_idx = all_episode_metadata[0][f"videos/{img_key}/file_index"]
video_path = new_meta.root / new_meta.video_path.format(
video_key=img_key, chunk_index=chunk_idx, file_index=file_idx
)
new_meta.info["features"][img_key]["info"] = get_video_info(video_path)
write_info(new_meta.info, new_meta.root)
# Copy stats and tasks
if dataset.meta.stats is not None:
# Remove image stats
new_stats = {k: v for k, v in dataset.meta.stats.items() if k not in img_keys}
write_stats(new_stats, new_meta.root)
if dataset.meta.tasks is not None:
write_tasks(dataset.meta.tasks, new_meta.root)
finally:
# Clean up temporary directory
if temp_dir.exists():
shutil.rmtree(temp_dir)
def make_convert_executor(
repo_id,
root,
output_dir,
output_repo_id,
vcodec,
pix_fmt,
g,
crf,
fast_decode,
num_image_workers,
job_name,
logs_dir,
workers,
partition,
cpus_per_task,
mem_per_cpu,
time_limit,
slurm=True,
):
"""Create executor for parallel video conversion."""
kwargs = {
"pipeline": [
ConvertEpisodesToVideo(
repo_id=repo_id,
root=root,
output_dir=output_dir,
output_repo_id=output_repo_id,
vcodec=vcodec,
pix_fmt=pix_fmt,
g=g,
crf=crf,
fast_decode=fast_decode,
num_image_workers=num_image_workers,
),
],
"logging_dir": str(logs_dir / job_name),
}
if slurm:
kwargs.update(
{
"job_name": job_name,
"tasks": workers, # Number of parallel tasks = number of workers
"workers": workers,
"time": time_limit,
"partition": partition,
"cpus_per_task": cpus_per_task,
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
}
)
executor = SlurmPipelineExecutor(**kwargs)
else:
kwargs.update(
{
"tasks": workers,
"workers": 1, # Local mode: sequential
}
)
executor = LocalPipelineExecutor(**kwargs)
return executor
def main():
parser = argparse.ArgumentParser(
description="Parallelize video conversion across SLURM workers",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
# Input/Output paths
parser.add_argument(
"--repo-id",
type=str,
required=True,
help="Repository ID of the source image dataset",
)
parser.add_argument(
"--root",
type=str,
default=None,
help="Root directory containing the source dataset (default: HF cache)",
)
parser.add_argument(
"--output-dir",
type=str,
required=True,
help="Output directory for the video dataset",
)
parser.add_argument(
"--output-repo-id",
type=str,
default=None,
help="Repository ID for output dataset (default: <repo_id>_video)",
)
# Video encoding parameters
parser.add_argument(
"--vcodec",
type=str,
default="libsvtav1",
help="Video codec",
)
parser.add_argument(
"--pix-fmt",
type=str,
default="yuv420p",
help="Pixel format",
)
parser.add_argument(
"--g",
type=int,
default=2,
help="GOP size (group of pictures)",
)
parser.add_argument(
"--crf",
type=int,
default=30,
help="Constant rate factor (quality)",
)
parser.add_argument(
"--fast-decode",
type=int,
default=0,
help="Fast decode tuning",
)
parser.add_argument(
"--num-image-workers",
type=int,
default=4,
help="Number of threads per worker for saving images",
)
# SLURM parameters
parser.add_argument(
"--logs-dir",
type=Path,
required=True,
help="Path to logs directory for datatrove",
)
parser.add_argument(
"--job-name",
type=str,
default="convert_to_video",
help="Job name for SLURM",
)
parser.add_argument(
"--slurm",
type=int,
default=1,
help="Launch over SLURM (1) or locally (0)",
)
parser.add_argument(
"--workers",
type=int,
default=100,
help="Number of parallel workers (each processes different episodes)",
)
parser.add_argument(
"--partition",
type=str,
required=True,
help="SLURM partition (use CPU partition)",
)
parser.add_argument(
"--cpus-per-task",
type=int,
default=8,
help="Number of CPUs per SLURM task",
)
parser.add_argument(
"--mem-per-cpu",
type=str,
default="4G",
help="Memory per CPU",
)
parser.add_argument(
"--time-limit",
type=str,
default="08:00:00",
help="Time limit for SLURM job",
)
args = parser.parse_args()
# Convert slurm flag to boolean
slurm = args.slurm == 1
# Create and run executor
executor = make_convert_executor(
repo_id=args.repo_id,
root=args.root,
output_dir=args.output_dir,
output_repo_id=args.output_repo_id,
vcodec=args.vcodec,
pix_fmt=args.pix_fmt,
g=args.g,
crf=args.crf,
fast_decode=args.fast_decode,
num_image_workers=args.num_image_workers,
job_name=args.job_name,
logs_dir=args.logs_dir,
workers=args.workers,
partition=args.partition,
cpus_per_task=args.cpus_per_task,
mem_per_cpu=args.mem_per_cpu,
time_limit=args.time_limit,
slurm=slurm,
)
logging.info(f"Starting video conversion with {args.workers} workers")
executor.run()
logging.info("All workers submitted/completed")
if __name__ == "__main__":
main()
+7
View File
@@ -0,0 +1,7 @@
==============================================
STEP 1: Starting parallel video conversion
Workers: 2
Input: lerobot (root: /fsx/jade_choghari/vlabench-primitive/)
Output: /fsx/jade_choghari/vlabench-primitive-encoded
==============================================
python: can't open file '/admin/home/jade_choghari/lerobot/slurm_convert_to_video.py': [Errno 2] No such file or directory
-4
View File
@@ -81,14 +81,10 @@ class AdamWConfig(OptimizerConfig):
eps: float = 1e-8
weight_decay: float = 1e-2
grad_clip_norm: float = 10.0
fused: bool = False
def build(self, params: dict) -> torch.optim.Optimizer:
kwargs = asdict(self)
kwargs.pop("grad_clip_norm")
# Fused optimizer only works on CUDA
if kwargs.get("fused") and not torch.cuda.is_available():
kwargs["fused"] = False
return torch.optim.AdamW(params, **kwargs)
@@ -136,7 +136,6 @@ class ACTConfig(PreTrainedConfig):
optimizer_lr: float = 1e-5
optimizer_weight_decay: float = 1e-4
optimizer_lr_backbone: float = 1e-5
optimizer_fused: bool = False # Use CUDA fused AdamW kernel
def __post_init__(self):
super().__post_init__()
@@ -165,7 +164,6 @@ class ACTConfig(PreTrainedConfig):
return AdamWConfig(
lr=self.optimizer_lr,
weight_decay=self.optimizer_weight_decay,
fused=self.optimizer_fused,
)
def get_scheduler_preset(self) -> None:
@@ -94,7 +94,6 @@ class GrootConfig(PreTrainedConfig):
optimizer_betas: tuple[float, float] = (0.95, 0.999)
optimizer_eps: float = 1e-8
optimizer_weight_decay: float = 1e-5
optimizer_fused: bool = False # Use CUDA fused AdamW kernel
warmup_ratio: float = 0.05
use_bf16: bool = True
@@ -175,7 +174,6 @@ class GrootConfig(PreTrainedConfig):
betas=self.optimizer_betas,
eps=self.optimizer_eps,
weight_decay=self.optimizer_weight_decay,
fused=self.optimizer_fused,
)
def get_scheduler_preset(self) -> CosineDecayWithWarmupSchedulerConfig:
@@ -23,8 +23,6 @@ from lerobot.optim.schedulers import CosineDecayWithWarmupSchedulerConfig
from lerobot.policies.rtc.configuration_rtc import RTCConfig
from lerobot.utils.constants import OBS_IMAGES
DEFAULT_IMAGE_SIZE = 224
@PreTrainedConfig.register_subclass("pi0")
@dataclass
@@ -53,10 +51,7 @@ class PI0Config(PreTrainedConfig):
# Real-Time Chunking (RTC) configuration
rtc_config: RTCConfig | None = None
image_resolution: tuple[int, int] = (
DEFAULT_IMAGE_SIZE,
DEFAULT_IMAGE_SIZE,
) # see openpi `preprocessing_pytorch.py`
image_resolution: tuple[int, int] = (224, 224) # see openpi `preprocessing_pytorch.py`
# Add empty images. Used to add empty cameras when no image features are present.
empty_cameras: int = 0
@@ -74,7 +69,6 @@ class PI0Config(PreTrainedConfig):
gradient_checkpointing: bool = False # Enable gradient checkpointing for memory optimization
compile_model: bool = False # Whether to use torch.compile for model optimization
compile_mode: str = "max-autotune" # Torch compile mode
optimizer_fused: bool = False # Use CUDA fused AdamW kernel
device: str | None = None # Device to use for the model (None = auto-detect)
# Optimizer settings: see openpi `AdamW``
@@ -142,7 +136,6 @@ class PI0Config(PreTrainedConfig):
eps=self.optimizer_eps,
weight_decay=self.optimizer_weight_decay,
grad_clip_norm=self.optimizer_grad_clip_norm,
fused=self.optimizer_fused,
)
def get_scheduler_preset(self):
+13 -14
View File
@@ -41,7 +41,7 @@ else:
PaliGemmaForConditionalGeneration = None
from lerobot.configs.policies import PreTrainedConfig
from lerobot.policies.pi0.configuration_pi0 import DEFAULT_IMAGE_SIZE, PI0Config
from lerobot.policies.pi0.configuration_pi0 import PI0Config
from lerobot.policies.pretrained import PreTrainedPolicy, T
from lerobot.policies.rtc.modeling_rtc import RTCProcessor
from lerobot.utils.constants import (
@@ -337,7 +337,6 @@ class PaliGemmaWithExpertModel(
action_expert_config,
use_adarms=None,
precision: Literal["bfloat16", "float32"] = "bfloat16",
image_size: int = DEFAULT_IMAGE_SIZE,
):
if use_adarms is None:
use_adarms = [False, False]
@@ -357,7 +356,6 @@ class PaliGemmaWithExpertModel(
vlm_config_hf.text_config.vocab_size = 257152
vlm_config_hf.text_config.use_adarms = use_adarms[0]
vlm_config_hf.text_config.adarms_cond_dim = vlm_config.width if use_adarms[0] else None
vlm_config_hf.vision_config.image_size = image_size
vlm_config_hf.vision_config.intermediate_size = 4304
vlm_config_hf.vision_config.projection_dim = 2048
vlm_config_hf.vision_config.projector_hidden_act = "gelu_fast"
@@ -521,17 +519,11 @@ class PI0Pytorch(nn.Module): # see openpi `PI0Pytorch`
paligemma_config = get_gemma_config(config.paligemma_variant)
action_expert_config = get_gemma_config(config.action_expert_variant)
if config.image_resolution[0] != config.image_resolution[1]:
raise ValueError(
f"PaliGemma expects square image resolution, invalid resolution: {config.image_resolution}"
)
self.paligemma_with_expert = PaliGemmaWithExpertModel(
paligemma_config,
action_expert_config,
use_adarms=[False, False],
precision=config.dtype,
image_size=config.image_resolution[0],
)
self.action_in_proj = nn.Linear(config.max_action_dim, action_expert_config.width)
@@ -820,13 +812,16 @@ class PI0Pytorch(nn.Module): # see openpi `PI0Pytorch`
)
dt = -1.0 / num_steps
dt = torch.tensor(dt, dtype=torch.float32, device=device)
x_t = noise
for step in range(num_steps):
time = 1.0 + step * dt
time_tensor = torch.tensor(time, dtype=torch.float32, device=device).expand(bsize)
time = torch.tensor(1.0, dtype=torch.float32, device=device)
while time >= -dt / 2:
expanded_time = time.expand(bsize)
def denoise_step_partial_call(input_x_t, current_timestep=time_tensor):
# Define a closure function to properly capture expanded_time
# This avoids the lambda expression (E731) and loop variable binding (B023) issues
def denoise_step_partial_call(input_x_t, current_timestep=expanded_time):
return self.denoise_step(
state=state,
prefix_pad_masks=prefix_pad_masks,
@@ -851,11 +846,15 @@ class PI0Pytorch(nn.Module): # see openpi `PI0Pytorch`
else:
v_t = denoise_step_partial_call(x_t)
x_t = x_t + dt * v_t
# Euler step
x_t += dt * v_t
# Record x_t and v_t after Euler step
if self.rtc_processor is not None and self.rtc_processor.is_debug_enabled():
self.rtc_processor.track(time=time, x_t=x_t, v_t=v_t)
time += dt
return x_t
def denoise_step(
@@ -22,8 +22,6 @@ from lerobot.optim.optimizers import AdamWConfig
from lerobot.optim.schedulers import CosineDecayWithWarmupSchedulerConfig
from lerobot.policies.rtc.configuration_rtc import RTCConfig
DEFAULT_IMAGE_SIZE = 224
@PreTrainedConfig.register_subclass("pi05")
@dataclass
@@ -52,10 +50,7 @@ class PI05Config(PreTrainedConfig):
# Real-Time Chunking (RTC) configuration
rtc_config: RTCConfig | None = None
image_resolution: tuple[int, int] = (
DEFAULT_IMAGE_SIZE,
DEFAULT_IMAGE_SIZE,
) # see openpi `preprocessing_pytorch.py`
image_resolution: tuple[int, int] = (224, 224) # see openpi `preprocessing_pytorch.py`
# Add empty images. Used to add empty cameras when no image features are present.
empty_cameras: int = 0
@@ -74,7 +69,6 @@ class PI05Config(PreTrainedConfig):
gradient_checkpointing: bool = False # Enable gradient checkpointing for memory optimization
compile_model: bool = False # Whether to use torch.compile for model optimization
compile_mode: str = "max-autotune" # Torch compile mode
optimizer_fused: bool = False # Use CUDA fused AdamW kernel
device: str | None = None # Device to use for the model (None = auto-detect)
# Optimizer settings: see openpi `AdamW`
@@ -142,7 +136,6 @@ class PI05Config(PreTrainedConfig):
eps=self.optimizer_eps,
weight_decay=self.optimizer_weight_decay,
grad_clip_norm=self.optimizer_grad_clip_norm,
fused=self.optimizer_fused,
)
def get_scheduler_preset(self):
+13 -14
View File
@@ -41,7 +41,7 @@ else:
PaliGemmaForConditionalGeneration = None
from lerobot.configs.policies import PreTrainedConfig
from lerobot.policies.pi05.configuration_pi05 import DEFAULT_IMAGE_SIZE, PI05Config
from lerobot.policies.pi05.configuration_pi05 import PI05Config
from lerobot.policies.pretrained import PreTrainedPolicy, T
from lerobot.policies.rtc.modeling_rtc import RTCProcessor
from lerobot.utils.constants import (
@@ -336,7 +336,6 @@ class PaliGemmaWithExpertModel(
action_expert_config,
use_adarms=None,
precision: Literal["bfloat16", "float32"] = "bfloat16",
image_size: int = DEFAULT_IMAGE_SIZE,
):
if use_adarms is None:
use_adarms = [False, False]
@@ -356,7 +355,6 @@ class PaliGemmaWithExpertModel(
vlm_config_hf.text_config.vocab_size = 257152
vlm_config_hf.text_config.use_adarms = use_adarms[0]
vlm_config_hf.text_config.adarms_cond_dim = vlm_config.width if use_adarms[0] else None
vlm_config_hf.vision_config.image_size = image_size
vlm_config_hf.vision_config.intermediate_size = 4304
vlm_config_hf.vision_config.projection_dim = 2048
vlm_config_hf.vision_config.projector_hidden_act = "gelu_fast"
@@ -520,17 +518,11 @@ class PI05Pytorch(nn.Module): # see openpi `PI0Pytorch`
paligemma_config = get_gemma_config(config.paligemma_variant)
action_expert_config = get_gemma_config(config.action_expert_variant)
if config.image_resolution[0] != config.image_resolution[1]:
raise ValueError(
f"PaliGemma expects square image resolution, invalid resolution: {config.image_resolution}"
)
self.paligemma_with_expert = PaliGemmaWithExpertModel(
paligemma_config,
action_expert_config,
use_adarms=[False, True],
precision=config.dtype,
image_size=config.image_resolution[0],
)
self.action_in_proj = nn.Linear(config.max_action_dim, action_expert_config.width)
@@ -795,13 +787,16 @@ class PI05Pytorch(nn.Module): # see openpi `PI0Pytorch`
)
dt = -1.0 / num_steps
dt = torch.tensor(dt, dtype=torch.float32, device=device)
x_t = noise
for step in range(num_steps):
time = 1.0 + step * dt
time_tensor = torch.tensor(time, dtype=torch.float32, device=device).expand(bsize)
time = torch.tensor(1.0, dtype=torch.float32, device=device)
while time >= -dt / 2:
expanded_time = time.expand(bsize)
def denoise_step_partial_call(input_x_t, current_timestep=time_tensor):
# Define a closure function to properly capture expanded_time
# This avoids the lambda expression (E731) and loop variable binding (B023) issues
def denoise_step_partial_call(input_x_t, current_timestep=expanded_time):
return self.denoise_step(
prefix_pad_masks=prefix_pad_masks,
past_key_values=past_key_values,
@@ -825,11 +820,15 @@ class PI05Pytorch(nn.Module): # see openpi `PI0Pytorch`
else:
v_t = denoise_step_partial_call(x_t)
x_t = x_t + dt * v_t
# Euler step
x_t += dt * v_t
# Record x_t and v_t after Euler step
if self.rtc_processor is not None and self.rtc_processor.is_debug_enabled():
self.rtc_processor.track(time=time, x_t=x_t, v_t=v_t)
time += dt
return x_t
def denoise_step(
@@ -79,7 +79,6 @@ class SmolVLAConfig(PreTrainedConfig):
optimizer_eps: float = 1e-8
optimizer_weight_decay: float = 1e-10
optimizer_grad_clip_norm: float = 10
optimizer_fused: bool = False
scheduler_warmup_steps: int = 1_000
scheduler_decay_steps: int = 30_000
@@ -137,7 +136,6 @@ class SmolVLAConfig(PreTrainedConfig):
eps=self.optimizer_eps,
weight_decay=self.optimizer_weight_decay,
grad_clip_norm=self.optimizer_grad_clip_norm,
fused=self.optimizer_fused,
)
def get_scheduler_preset(self):
@@ -783,15 +783,18 @@ class VLAFlowMatching(nn.Module):
use_cache=self.config.use_cache,
fill_kv_cache=True,
)
num_steps = self.config.num_steps
dt = -1.0 / num_steps
dt = -1.0 / self.config.num_steps
dt = torch.tensor(dt, dtype=torch.float32, device=device)
x_t = noise
for step in range(num_steps):
time = 1.0 + step * dt
time_tensor = torch.tensor(time, dtype=torch.float32, device=device).expand(bsize)
time = torch.tensor(1.0, dtype=torch.float32, device=device)
def denoise_step_partial_call(input_x_t, current_timestep=time_tensor):
while time >= -dt / 2:
expanded_time = time.expand(bsize)
# Define a closure function to properly capture expanded_time
# This avoids the lambda expression (E731) and loop variable binding (B023) issues
def denoise_step_partial_call(input_x_t, current_timestep=expanded_time):
return self.denoise_step(
x_t=input_x_t,
prefix_pad_masks=prefix_pad_masks,
@@ -815,11 +818,15 @@ class VLAFlowMatching(nn.Module):
else:
v_t = denoise_step_partial_call(x_t)
x_t = x_t + dt * v_t
# Euler step
x_t += dt * v_t
# Record x_t and v_t after Euler step (other params are recorded in rtc_processor.denoise_step)
if self.rtc_processor is not None and self.rtc_processor.is_debug_enabled():
self.rtc_processor.track(time=time, x_t=x_t, v_t=v_t)
time += dt
return x_t
def denoise_step(