mirror of
https://github.com/huggingface/lerobot.git
synced 2026-06-17 16:27:04 +00:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6216932fb0 | |||
| 5fab1ed5cd | |||
| 8b7c46c5f7 | |||
| ba97f64afd | |||
| 8d861fe94b | |||
| d22fa6446b |
@@ -31,8 +31,7 @@ jobs:
|
||||
name: Upload Preview and Comment
|
||||
if: >
|
||||
github.event.workflow_run.event == 'pull_request' &&
|
||||
github.event.workflow_run.conclusion == 'success' &&
|
||||
github.repository == 'huggingface/lerobot'
|
||||
github.event.workflow_run.conclusion == 'success'
|
||||
uses: huggingface/doc-builder/.github/workflows/upload_pr_documentation.yml@main
|
||||
with:
|
||||
package_name: lerobot
|
||||
|
||||
@@ -42,9 +42,7 @@ jobs:
|
||||
# This job builds and deploys the official documentation.
|
||||
build_main_docs:
|
||||
name: Build Main Docs
|
||||
if: >
|
||||
(github.event_name == 'push' || github.event_name == 'workflow_dispatch') &&
|
||||
github.repository == 'huggingface/lerobot'
|
||||
if: github.event_name == 'push' || github.event_name == 'workflow_dispatch'
|
||||
permissions:
|
||||
contents: read
|
||||
uses: huggingface/doc-builder/.github/workflows/build_main_documentation.yml@main
|
||||
@@ -60,7 +58,7 @@ jobs:
|
||||
# The result of this job triggers the 'Upload PR Documentation' workflow.
|
||||
build_pr_docs:
|
||||
name: Build PR Docs
|
||||
if: github.event_name == 'pull_request' && github.repository == 'huggingface/lerobot'
|
||||
if: github.event_name == 'pull_request'
|
||||
permissions:
|
||||
contents: read
|
||||
pull-requests: write
|
||||
|
||||
@@ -45,6 +45,7 @@ permissions:
|
||||
env:
|
||||
UV_VERSION: "0.8.0"
|
||||
PYTHON_VERSION: "3.10"
|
||||
DOCKER_IMAGE_NAME: huggingface/lerobot-gpu
|
||||
|
||||
# Ensures that only the latest commit for a PR or branch is built, canceling older runs.
|
||||
concurrency:
|
||||
|
||||
@@ -43,7 +43,6 @@ jobs:
|
||||
name: Build CPU Docker for Nightly
|
||||
runs-on:
|
||||
group: aws-general-8-plus
|
||||
if: github.repository == 'huggingface/lerobot'
|
||||
outputs:
|
||||
image_tag: ${{ env.DOCKER_IMAGE_NAME_CPU }}
|
||||
steps:
|
||||
@@ -78,7 +77,6 @@ jobs:
|
||||
name: Build GPU Docker for Nightly
|
||||
runs-on:
|
||||
group: aws-general-8-plus
|
||||
if: github.repository == 'huggingface/lerobot'
|
||||
outputs:
|
||||
image_tag: ${{ env.DOCKER_IMAGE_NAME_GPU }}
|
||||
steps:
|
||||
|
||||
@@ -29,7 +29,6 @@ jobs:
|
||||
build-and-publish:
|
||||
name: Build and publish Python distributions
|
||||
runs-on: ubuntu-latest
|
||||
if: github.repository == 'huggingface/lerobot'
|
||||
outputs:
|
||||
version: ${{ steps.extract_info.outputs.tag_version }}
|
||||
permissions:
|
||||
|
||||
@@ -45,7 +45,6 @@ jobs:
|
||||
stale:
|
||||
name: Close Stale Issues and PRs
|
||||
runs-on: ubuntu-latest
|
||||
if: github.repository == 'huggingface/lerobot'
|
||||
permissions:
|
||||
actions: write
|
||||
contents: write # only for delete-branch option
|
||||
|
||||
@@ -43,7 +43,6 @@ jobs:
|
||||
full-tests:
|
||||
name: Full Unbound Tests
|
||||
runs-on: ubuntu-latest
|
||||
if: github.repository == 'huggingface/lerobot'
|
||||
env:
|
||||
MUJOCO_GL: egl
|
||||
HF_HOME: /mnt/cache/.cache/huggingface
|
||||
|
||||
+82
-40
@@ -24,7 +24,7 @@ Built from pure Transformer encoders, X-VLA scales naturally with model size and
|
||||
<img
|
||||
src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/lerobot/xvla-architecture2.png"
|
||||
alt="XVLA Architecture 2"
|
||||
style="width: 60%; height: auto;"
|
||||
style="width: 32%; max-width: 450px; height: auto;"
|
||||
/>
|
||||
</p>
|
||||
|
||||
@@ -120,7 +120,7 @@ Adapted for Google Robot platforms.
|
||||
|
||||
### Recommended Training Configuration
|
||||
|
||||
When fine-tuning X-VLA for a new embodiment or task, we recommend not freezing the VLM, and also setting the `policy.dtype=bfloat16` to not hit OOM errors.
|
||||
When fine-tuning X-VLA for a new embodiment or task, we recommend the following freezing strategy:
|
||||
|
||||
```bash
|
||||
lerobot-train \
|
||||
@@ -129,26 +129,25 @@ lerobot-train \
|
||||
--job_name=xvla_training \
|
||||
--policy.path="lerobot/xvla-base" \
|
||||
--policy.repo_id="HF_USER/xvla-your-robot" \
|
||||
--policy.dtype=bfloat16 \
|
||||
--steps=3000 \
|
||||
--policy.device=cuda \
|
||||
--policy.freeze_vision_encoder=false \
|
||||
--policy.freeze_language_encoder=false \
|
||||
--policy.train_policy_transformer=true \
|
||||
--policy.train_soft_prompts=true \
|
||||
--policy.freeze_vision_encoder=True \
|
||||
--policy.freeze_language_encoder=True \
|
||||
--policy.train_policy_transformer=True \
|
||||
--policy.train_soft_prompts=True \
|
||||
--policy.action_mode=YOUR_ACTION_MODE
|
||||
```
|
||||
|
||||
### Training Parameters Explained
|
||||
|
||||
| Parameter | Default | Description |
|
||||
| -------------------------- | ------- | ---------------------------------------------- |
|
||||
| `freeze_vision_encoder` | `false` | Do not freeze the VLM vision encoder weights |
|
||||
| `freeze_language_encoder` | `false` | Do not freeze the VLM language encoder weights |
|
||||
| `train_policy_transformer` | `true` | Allow policy transformer layers to train |
|
||||
| `train_soft_prompts` | `true` | Allow soft prompts to train |
|
||||
| Parameter | Default | Description |
|
||||
| -------------------------- | ------- | ---------------------------------------- |
|
||||
| `freeze_vision_encoder` | `True` | Freeze the VLM vision encoder weights |
|
||||
| `freeze_language_encoder` | `True` | Freeze the VLM language encoder weights |
|
||||
| `train_policy_transformer` | `True` | Allow policy transformer layers to train |
|
||||
| `train_soft_prompts` | `True` | Allow soft prompts to train |
|
||||
|
||||
**💡 Best Practice**: For Phase II adaptation to new embodiments, do not freeze the VLM encoders and also train the policy transformer and soft prompts.
|
||||
**💡 Best Practice**: For Phase II adaptation to new embodiments, freeze the VLM encoders and only train the policy transformer and soft prompts. This provides excellent sample efficiency with minimal compute.
|
||||
|
||||
### Example: Training on Bimanual Robot
|
||||
|
||||
@@ -158,15 +157,14 @@ lerobot-train \
|
||||
--output_dir=./outputs/xvla_bimanual \
|
||||
--job_name=xvla_so101_training \
|
||||
--policy.path="lerobot/xvla-base" \
|
||||
--policy.dtype=bfloat16 \
|
||||
--policy.repo_id="YOUR_USERNAME/xvla-biso101" \
|
||||
--steps=3000 \
|
||||
--policy.device=cuda \
|
||||
--policy.action_mode=so101_bimanual \
|
||||
--policy.freeze_vision_encoder=false \
|
||||
--policy.freeze_language_encoder=false \
|
||||
--policy.train_policy_transformer=true \
|
||||
--policy.train_soft_prompts=true
|
||||
--policy.freeze_vision_encoder=True \
|
||||
--policy.freeze_language_encoder=True \
|
||||
--policy.train_policy_transformer=True \
|
||||
--policy.train_soft_prompts=True
|
||||
```
|
||||
|
||||
💡 **Best Performance:** If you have sufficient computational resources and want to achieve best X-VLA finetuning performance, you should follow the official finetuning strategy:
|
||||
@@ -174,7 +172,71 @@ lerobot-train \
|
||||
**🔥 Full-finetune all components with a custom learning-rate scheme**
|
||||
|
||||
To ensure stable optimization, the Vision-Language Model (VLM) must be trained with only 1/10 of the base learning rate, while all other components use the full LR.
|
||||
This LR ratio is crucial for achieving strong and stable finetuning performance. This is already done for you by default.
|
||||
This LR ratio is crucial for achieving strong and stable finetuning performance.
|
||||
To enable this behavior, you must:
|
||||
|
||||
1. Implement a custom optimizer and register it in your training config
|
||||
|
||||
```
|
||||
from dataclasses import dataclass, asdict
|
||||
from lerobot.optim.optimizers import OptimizerConfig
|
||||
import torch
|
||||
|
||||
@OptimizerConfig.register_subclass("xvla-adamw")
|
||||
@dataclass
|
||||
class XVLAAdamW(OptimizerConfig):
|
||||
lr: float = 1e-4
|
||||
betas: tuple[float, float] = (0.9, 0.99)
|
||||
eps: float = 1e-8
|
||||
weight_decay: float = 0.0
|
||||
grad_clip_norm: float = 10.0
|
||||
|
||||
def build(self, params: dict) -> torch.optim.Optimizer:
|
||||
"""
|
||||
Expect `named_parameters()` as input.
|
||||
Apply lr = lr / 10 for all VLM-related parameters.
|
||||
"""
|
||||
assert isinstance(params, dict), \
|
||||
"Custom LR optimizer requires `named_parameters()` as inputs."
|
||||
kwargs = asdict(self)
|
||||
kwargs.pop("grad_clip_norm")
|
||||
vlm_group, other_group = [], []
|
||||
for name, p in params.items():
|
||||
if not p.requires_grad:
|
||||
continue
|
||||
if "vlm" in name.lower():
|
||||
vlm_group.append(p)
|
||||
else:
|
||||
other_group.append(p)
|
||||
|
||||
param_groups = [
|
||||
{"params": vlm_group, "lr": self.lr * 0.1, "weight_decay": self.weight_decay * 0.1},
|
||||
{"params": other_group, "lr": self.lr, "weight_decay": self.weight_decay},
|
||||
]
|
||||
|
||||
return torch.optim.AdamW(param_groups, **kwargs)
|
||||
```
|
||||
|
||||
2. Modify X-VLA’s get_optim_params to return named parameters
|
||||
|
||||
Replace:
|
||||
|
||||
```
|
||||
def get_optim_params(self) -> dict:
|
||||
"""Return only trainable parameters for optimization."""
|
||||
return filter(lambda p: p.requires_grad, self.parameters())
|
||||
```
|
||||
|
||||
with:
|
||||
|
||||
```
|
||||
def get_optim_params(self):
|
||||
"""Return trainable named parameters."""
|
||||
return filter(lambda kv: kv[1].requires_grad, self.named_parameters())
|
||||
```
|
||||
|
||||
This ensures the optimizer receives a dict of named parameters, allowing it to correctly detect VLM modules and apply the 1/10 LR rule.
|
||||
|
||||
❕Note
|
||||
|
||||
Completely matching the official reported performance may require an additional warm-up LR schedule for soft-prompts, which can bring minor improvements.
|
||||
@@ -264,26 +326,6 @@ domain_id = 3
|
||||
|
||||
The domain_id is automatically added to observations by the `XVLAAddDomainIdProcessorStep` in the preprocessing pipeline.
|
||||
|
||||
The `lerobot/xvla-base` model has been trained on the following domain IDs. It is recommended to choose one that most resembles your robot/configuration:
|
||||
|
||||
#### Fine-tuning Datasets
|
||||
|
||||
| Dataset Name | Domain ID |
|
||||
| ---------------- | --------- |
|
||||
| Bridge | 0 |
|
||||
| RT1 | 1 |
|
||||
| Calvin | 2 |
|
||||
| libero | 3 |
|
||||
| widowx-air | 4 |
|
||||
| AIR-AGILEX-HQ | 5 |
|
||||
| robotwin2_abs_ee | 6 |
|
||||
| robotwin2_clean | 6 |
|
||||
| robocasa-human | 7 |
|
||||
| VLABench | 8 |
|
||||
| AGIBOT-challenge | 9 |
|
||||
| AIR-AGILEX | 10 |
|
||||
| AIRBOT | 18 |
|
||||
|
||||
### 3. Processor Steps
|
||||
|
||||
X-VLA requires specific preprocessing and postprocessing steps for proper operation.
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
Goal:
|
||||
Create a high and low policy, the high policy take obs + text and return text, its a VLM
|
||||
The low policy is a VLA that take text returned by the high and ouptut actions
|
||||
|
||||
|
||||
High policy is ran every one second or when user send prompt
|
||||
|
||||
|
||||
Synthetic data generation:
|
||||
D demo -> teleop data with a global task annotation
|
||||
D label -> segment data into short skills (one to three seconds)
|
||||
D syn: p-gen will create the high level prompt user might gave to p-hi
|
||||
Given D label prompt p-gen to imagine appropriate action, take images, ALL PRIOR skill labels in the episode: ℓ̂₀, …, ℓ̂ₜ₋₁
|
||||
|
||||
“Given the scene + all previous steps + current needed skill ℓ̂₅,
|
||||
generate a user request that logically leads to ℓ̂₅.”
|
||||
|
||||
Train:
|
||||
Phi(lt| images, global label) cross entropy - next token predictions
|
||||
Plow(At| images, qt, lt)
|
||||
@@ -0,0 +1,141 @@
|
||||
#!/bin/bash
|
||||
#SBATCH --time=24:00:00
|
||||
#SBATCH --partition=hopper-cpu
|
||||
#SBATCH --cpus-per-task=96
|
||||
#SBATCH --output=/fsx/jade_choghari/logs/launcher_%j.out
|
||||
#SBATCH --error=/fsx/jade_choghari/logs/launcher_%j.err
|
||||
# Activate conda environment
|
||||
# Load conda
|
||||
source /fsx/jade_choghari/miniforge3/etc/profile.d/conda.sh
|
||||
conda activate lerobot
|
||||
set -e # Exit on error
|
||||
|
||||
# Input dataset
|
||||
REPO_ID="lerobot"
|
||||
ROOT="/fsx/jade_choghari/vlabench-primitive/"
|
||||
|
||||
# Output paths
|
||||
OUTPUT_DIR="/fsx/jade_choghari/vlabench-primitive-encoded"
|
||||
OUTPUT_REPO_ID="vlabench-primitive-encoded"
|
||||
LOGS_DIR="/fsx/jade_choghari/logs/convert_video"
|
||||
|
||||
# Video encoding settings
|
||||
VCODEC="libsvtav1"
|
||||
PIX_FMT="yuv420p"
|
||||
GOP_SIZE=2
|
||||
CRF=30
|
||||
FAST_DECODE=0
|
||||
|
||||
# Parallelization settings
|
||||
NUM_WORKERS=24 # Number of parallel SLURM workers
|
||||
NUM_IMAGE_WORKERS=4 # Threads per worker for image saving
|
||||
|
||||
# SLURM settings
|
||||
PARTITION="hopper-cpu" # Change to your CPU partition name
|
||||
CPUS_PER_TASK=32 # CPUs per worker
|
||||
MEM_PER_CPU="4G" # Memory per CPU
|
||||
TIME_LIMIT="24:00:00" # Time limit per job
|
||||
|
||||
###############################################################################
|
||||
# STEP 1: Parallel Video Conversion
|
||||
###############################################################################
|
||||
rm -rf "${OUTPUT_DIR}"
|
||||
mkdir -p "${OUTPUT_DIR}"
|
||||
|
||||
echo "=============================================="
|
||||
echo "STEP 1: Starting parallel video conversion"
|
||||
echo " Workers: ${NUM_WORKERS}"
|
||||
echo " Input: ${REPO_ID} (root: ${ROOT})"
|
||||
echo " Output: ${OUTPUT_DIR}"
|
||||
echo "=============================================="
|
||||
|
||||
python /admin/home/jade_choghari/lerobot/examples/port_datasets/slurm_convert_to_video.py\
|
||||
--repo-id "${REPO_ID}" \
|
||||
--root "${ROOT}" \
|
||||
--output-dir "${OUTPUT_DIR}" \
|
||||
--output-repo-id "${OUTPUT_REPO_ID}" \
|
||||
--vcodec "${VCODEC}" \
|
||||
--pix-fmt "${PIX_FMT}" \
|
||||
--g ${GOP_SIZE} \
|
||||
--crf ${CRF} \
|
||||
--fast-decode ${FAST_DECODE} \
|
||||
--num-image-workers ${NUM_IMAGE_WORKERS} \
|
||||
--logs-dir "${LOGS_DIR}" \
|
||||
--job-name "convert_video" \
|
||||
--slurm 1 \
|
||||
--workers ${NUM_WORKERS} \
|
||||
--partition "${PARTITION}" \
|
||||
--cpus-per-task ${CPUS_PER_TASK} \
|
||||
--mem-per-cpu "${MEM_PER_CPU}" \
|
||||
--time-limit "${TIME_LIMIT}"
|
||||
|
||||
echo ""
|
||||
echo "✓ Parallel conversion jobs submitted!"
|
||||
echo " Monitor with: squeue -u \$USER"
|
||||
echo " Check logs in: ${LOGS_DIR}/convert_video"
|
||||
echo ""
|
||||
echo "Wait for all jobs to complete before running Step 2."
|
||||
echo "You can check completion with: squeue -u \$USER | grep convert_video"
|
||||
echo ""
|
||||
echo "After all jobs complete, run Step 2 to aggregate shards:"
|
||||
echo " bash convert_to_video_parallel.sh aggregate"
|
||||
|
||||
###############################################################################
|
||||
# STEP 2: Aggregate Shards (run this after Step 1 completes)
|
||||
###############################################################################
|
||||
|
||||
if [ "$1" == "aggregate" ]; then
|
||||
echo ""
|
||||
echo "=============================================="
|
||||
echo "STEP 2: Aggregating video shards"
|
||||
echo " Shards: ${NUM_WORKERS}"
|
||||
echo " Input: ${OUTPUT_DIR}/shard_XXXX"
|
||||
echo " Output: ${OUTPUT_DIR}_final"
|
||||
echo "=============================================="
|
||||
|
||||
python slurm_aggregate_video_shards.py \
|
||||
--shards-dir "${OUTPUT_DIR}" \
|
||||
--output-dir "${OUTPUT_DIR}_final" \
|
||||
--output-repo-id "${OUTPUT_REPO_ID}" \
|
||||
--num-shards ${NUM_WORKERS} \
|
||||
--logs-dir "${LOGS_DIR}" \
|
||||
--job-name "aggregate_video" \
|
||||
--slurm 1 \
|
||||
--partition "${PARTITION}" \
|
||||
--cpus-per-task 16 \
|
||||
--mem-per-cpu "8G" \
|
||||
--time-limit "08:00:00"
|
||||
|
||||
echo ""
|
||||
echo "✓ Aggregation job submitted!"
|
||||
echo " Monitor with: squeue -u \$USER | grep aggregate_video"
|
||||
echo " Check logs in: ${LOGS_DIR}/aggregate_video"
|
||||
echo ""
|
||||
echo "After completion, your final dataset will be in:"
|
||||
echo " ${OUTPUT_DIR}_final"
|
||||
fi
|
||||
|
||||
###############################################################################
|
||||
# Helpful information
|
||||
###############################################################################
|
||||
|
||||
if [ "$1" != "aggregate" ]; then
|
||||
echo ""
|
||||
echo "=============================================="
|
||||
echo "WORKFLOW SUMMARY"
|
||||
echo "=============================================="
|
||||
echo ""
|
||||
echo "1. Step 1 is now running - it will:"
|
||||
echo " - Split episodes across ${NUM_WORKERS} workers"
|
||||
echo " - Each worker converts its episodes to video"
|
||||
echo " - Creates shard datasets in ${OUTPUT_DIR}/shard_XXXX"
|
||||
echo ""
|
||||
echo "2. After Step 1 completes, run Step 2:"
|
||||
echo " bash convert_to_video_parallel.sh aggregate"
|
||||
echo ""
|
||||
echo "3. Step 2 will merge all shards into a single dataset"
|
||||
echo ""
|
||||
echo "=============================================="
|
||||
fi
|
||||
|
||||
|
||||
@@ -0,0 +1,266 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Aggregate video dataset shards into a single dataset.
|
||||
|
||||
After parallel conversion using slurm_convert_to_video.py, this script merges
|
||||
all the shard datasets into one final dataset.
|
||||
|
||||
Example usage:
|
||||
python slurm_aggregate_video_shards.py \
|
||||
--shards-dir /fsx/jade_choghari/libero_video \
|
||||
--output-dir /fsx/jade_choghari/libero_video_final \
|
||||
--output-repo-id lerobot_video \
|
||||
--num-workers 100 \
|
||||
--partition cpu_partition \
|
||||
--cpus-per-task 16
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from datatrove.executor import LocalPipelineExecutor
|
||||
from datatrove.executor.slurm import SlurmPipelineExecutor
|
||||
from datatrove.pipeline.base import PipelineStep
|
||||
|
||||
|
||||
class AggregateVideoShards(PipelineStep):
|
||||
"""Pipeline step that aggregates video dataset shards."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
shards_dir: str | Path,
|
||||
output_dir: str | Path,
|
||||
output_repo_id: str,
|
||||
num_shards: int,
|
||||
):
|
||||
super().__init__()
|
||||
self.shards_dir = Path(shards_dir)
|
||||
self.output_dir = Path(output_dir)
|
||||
self.output_repo_id = output_repo_id
|
||||
self.num_shards = num_shards
|
||||
|
||||
def run(self, data=None, rank: int = 0, world_size: int = 1):
|
||||
"""Aggregate all shards into a single dataset."""
|
||||
from lerobot.datasets.dataset_tools import merge_datasets
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.utils.utils import init_logging
|
||||
|
||||
init_logging()
|
||||
|
||||
# Only worker 0 performs aggregation
|
||||
if rank != 0:
|
||||
logging.info(f"Worker {rank} skipping - only worker 0 performs aggregation")
|
||||
return
|
||||
|
||||
logging.info(f"Starting aggregation of {self.num_shards} shards")
|
||||
|
||||
# Collect all shard datasets
|
||||
shard_datasets = []
|
||||
for shard_idx in range(self.num_shards):
|
||||
shard_dir = self.shards_dir / f"shard_{shard_idx:04d}"
|
||||
if not shard_dir.exists():
|
||||
logging.warning(f"Shard directory not found: {shard_dir}")
|
||||
continue
|
||||
|
||||
# Find the repo_id for this shard
|
||||
shard_repo_id = f"{self.output_repo_id}_shard_{shard_idx:04d}"
|
||||
try:
|
||||
shard_dataset = LeRobotDataset(shard_repo_id, root=shard_dir)
|
||||
shard_datasets.append(shard_dataset)
|
||||
logging.info(
|
||||
f"Loaded shard {shard_idx}: {shard_dataset.meta.total_episodes} episodes, "
|
||||
f"{shard_dataset.meta.total_frames} frames"
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to load shard {shard_idx}: {e}")
|
||||
continue
|
||||
|
||||
if len(shard_datasets) == 0:
|
||||
raise ValueError(f"No valid shards found in {self.shards_dir}")
|
||||
|
||||
logging.info(f"Successfully loaded {len(shard_datasets)} shards, starting merge")
|
||||
|
||||
# Merge all shards
|
||||
self.output_dir.mkdir(parents=True, exist_ok=True)
|
||||
merged_dataset = merge_datasets(
|
||||
shard_datasets,
|
||||
output_repo_id=self.output_repo_id,
|
||||
output_dir=self.output_dir,
|
||||
)
|
||||
|
||||
logging.info("✓ Aggregation complete!")
|
||||
logging.info(f"Merged dataset saved to: {self.output_dir}")
|
||||
logging.info(f"Total episodes: {merged_dataset.meta.total_episodes}")
|
||||
logging.info(f"Total frames: {merged_dataset.meta.total_frames}")
|
||||
|
||||
|
||||
def make_aggregate_executor(
|
||||
shards_dir,
|
||||
output_dir,
|
||||
output_repo_id,
|
||||
num_shards,
|
||||
job_name,
|
||||
logs_dir,
|
||||
partition,
|
||||
cpus_per_task,
|
||||
mem_per_cpu,
|
||||
time_limit,
|
||||
slurm=True,
|
||||
):
|
||||
"""Create executor for shard aggregation."""
|
||||
kwargs = {
|
||||
"pipeline": [
|
||||
AggregateVideoShards(
|
||||
shards_dir=shards_dir,
|
||||
output_dir=output_dir,
|
||||
output_repo_id=output_repo_id,
|
||||
num_shards=num_shards,
|
||||
),
|
||||
],
|
||||
"logging_dir": str(logs_dir / job_name),
|
||||
}
|
||||
|
||||
if slurm:
|
||||
# Only need 1 worker for aggregation
|
||||
kwargs.update(
|
||||
{
|
||||
"job_name": job_name,
|
||||
"tasks": 1,
|
||||
"workers": 1,
|
||||
"time": time_limit,
|
||||
"partition": partition,
|
||||
"cpus_per_task": cpus_per_task,
|
||||
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
|
||||
}
|
||||
)
|
||||
executor = SlurmPipelineExecutor(**kwargs)
|
||||
else:
|
||||
kwargs.update(
|
||||
{
|
||||
"tasks": 1,
|
||||
"workers": 1,
|
||||
}
|
||||
)
|
||||
executor = LocalPipelineExecutor(**kwargs)
|
||||
|
||||
return executor
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Aggregate video dataset shards into a single dataset",
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--shards-dir",
|
||||
type=Path,
|
||||
required=True,
|
||||
help="Directory containing shard_XXXX subdirectories",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output-dir",
|
||||
type=Path,
|
||||
required=True,
|
||||
help="Output directory for the aggregated dataset",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output-repo-id",
|
||||
type=str,
|
||||
required=True,
|
||||
help="Repository ID for the aggregated dataset",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--num-shards",
|
||||
type=int,
|
||||
required=True,
|
||||
help="Number of shards to aggregate (should match --workers from conversion)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--logs-dir",
|
||||
type=Path,
|
||||
required=True,
|
||||
help="Path to logs directory for datatrove",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--job-name",
|
||||
type=str,
|
||||
default="aggregate_video_shards",
|
||||
help="Job name for SLURM",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--slurm",
|
||||
type=int,
|
||||
default=1,
|
||||
help="Launch over SLURM (1) or locally (0)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--partition",
|
||||
type=str,
|
||||
required=True,
|
||||
help="SLURM partition (use CPU partition)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cpus-per-task",
|
||||
type=int,
|
||||
default=16,
|
||||
help="Number of CPUs per task (aggregation can use more CPUs)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--mem-per-cpu",
|
||||
type=str,
|
||||
default="8G",
|
||||
help="Memory per CPU",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--time-limit",
|
||||
type=str,
|
||||
default="08:00:00",
|
||||
help="Time limit for SLURM job",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Convert slurm flag to boolean
|
||||
slurm = args.slurm == 1
|
||||
|
||||
# Create and run executor
|
||||
executor = make_aggregate_executor(
|
||||
shards_dir=args.shards_dir,
|
||||
output_dir=args.output_dir,
|
||||
output_repo_id=args.output_repo_id,
|
||||
num_shards=args.num_shards,
|
||||
job_name=args.job_name,
|
||||
logs_dir=args.logs_dir,
|
||||
partition=args.partition,
|
||||
cpus_per_task=args.cpus_per_task,
|
||||
mem_per_cpu=args.mem_per_cpu,
|
||||
time_limit=args.time_limit,
|
||||
slurm=slurm,
|
||||
)
|
||||
|
||||
logging.info("Starting shard aggregation")
|
||||
executor.run()
|
||||
logging.info("Aggregation job submitted/completed")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
|
||||
@@ -0,0 +1,539 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Parallelize video conversion using SLURM and Datatrove.
|
||||
|
||||
This script converts an image-based LeRobot dataset to video format by distributing
|
||||
episodes across multiple workers for parallel processing.
|
||||
|
||||
Example usage:
|
||||
python slurm_convert_to_video.py \
|
||||
--repo-id lerobot \
|
||||
--root /fsx/jade_choghari/libero/ \
|
||||
--output-dir /fsx/jade_choghari/libero_video \
|
||||
--output-repo-id lerobot_video \
|
||||
--workers 100 \
|
||||
--partition cpu_partition \
|
||||
--cpus-per-task 8 \
|
||||
--logs-dir ./logs
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from datatrove.executor import LocalPipelineExecutor
|
||||
from datatrove.executor.slurm import SlurmPipelineExecutor
|
||||
from datatrove.pipeline.base import PipelineStep
|
||||
|
||||
|
||||
class ConvertEpisodesToVideo(PipelineStep):
|
||||
"""Pipeline step that converts episodes to videos in parallel."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
repo_id: str,
|
||||
root: str | None,
|
||||
output_dir: str,
|
||||
output_repo_id: str | None,
|
||||
vcodec: str = "libsvtav1",
|
||||
pix_fmt: str = "yuv420p",
|
||||
g: int = 2,
|
||||
crf: int = 30,
|
||||
fast_decode: int = 0,
|
||||
num_image_workers: int = 4,
|
||||
):
|
||||
super().__init__()
|
||||
self.repo_id = repo_id
|
||||
self.root = root
|
||||
self.output_dir = Path(output_dir)
|
||||
self.output_repo_id = output_repo_id or f"{repo_id}_video"
|
||||
self.vcodec = vcodec
|
||||
self.pix_fmt = pix_fmt
|
||||
self.g = g
|
||||
self.crf = crf
|
||||
self.fast_decode = fast_decode
|
||||
self.num_image_workers = num_image_workers
|
||||
|
||||
def run(self, data=None, rank: int = 0, world_size: int = 1):
|
||||
"""Process a shard of episodes."""
|
||||
from datasets.utils.tqdm import disable_progress_bars
|
||||
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.scripts.lerobot_edit_dataset import (
|
||||
encode_episode_videos,
|
||||
)
|
||||
from lerobot.utils.utils import init_logging
|
||||
import logging
|
||||
init_logging()
|
||||
disable_progress_bars()
|
||||
|
||||
logging.info(f"Worker {rank}/{world_size} starting video conversion")
|
||||
|
||||
# Load source dataset
|
||||
dataset = LeRobotDataset(self.repo_id, root=self.root)
|
||||
total_episodes = dataset.meta.total_episodes
|
||||
|
||||
# Determine which episodes this worker processes
|
||||
episodes_per_worker = total_episodes // world_size
|
||||
remainder = total_episodes % world_size
|
||||
|
||||
if rank < remainder:
|
||||
# First 'remainder' workers get one extra episode
|
||||
start_ep = rank * (episodes_per_worker + 1)
|
||||
end_ep = start_ep + episodes_per_worker + 1
|
||||
else:
|
||||
start_ep = remainder * (episodes_per_worker + 1) + (rank - remainder) * episodes_per_worker
|
||||
end_ep = start_ep + episodes_per_worker
|
||||
|
||||
episode_indices = list(range(start_ep, end_ep))
|
||||
|
||||
logging.info(
|
||||
f"Worker {rank} processing episodes {start_ep} to {end_ep-1} ({len(episode_indices)} episodes)"
|
||||
)
|
||||
|
||||
if len(episode_indices) == 0:
|
||||
logging.info(f"Worker {rank} has no episodes to process")
|
||||
return
|
||||
|
||||
# Create shard-specific output directory
|
||||
import shutil
|
||||
shard_output_dir = self.output_dir / f"shard_{rank:04d}"
|
||||
|
||||
# Remove existing directory to avoid conflicts with LeRobotDatasetMetadata.create
|
||||
if shard_output_dir.exists():
|
||||
logging.warning(f"Shard directory {shard_output_dir} already exists, removing it")
|
||||
shutil.rmtree(shard_output_dir)
|
||||
|
||||
# Import conversion function
|
||||
from lerobot.scripts.lerobot_edit_dataset import convert_dataset_to_videos
|
||||
|
||||
logging.info(
|
||||
f"Worker {rank} converting {len(episode_indices)} episodes with codec {self.vcodec}, CRF {self.crf}"
|
||||
)
|
||||
|
||||
# Convert this shard's episodes with remapped indices
|
||||
# We need to remap episode indices to start from 0 for proper file structure
|
||||
self._convert_shard_to_videos(
|
||||
dataset=dataset,
|
||||
shard_output_dir=shard_output_dir,
|
||||
shard_repo_id=f"{self.output_repo_id}_shard_{rank:04d}",
|
||||
episode_indices=episode_indices,
|
||||
)
|
||||
|
||||
logging.info(f"Worker {rank} completed successfully")
|
||||
|
||||
def _convert_shard_to_videos(
|
||||
self,
|
||||
dataset,
|
||||
shard_output_dir,
|
||||
shard_repo_id,
|
||||
episode_indices,
|
||||
):
|
||||
"""Convert a shard's episodes to videos with proper index remapping."""
|
||||
import shutil
|
||||
import pandas as pd
|
||||
from tqdm import tqdm
|
||||
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDatasetMetadata
|
||||
from lerobot.datasets.utils import write_info, write_stats, write_tasks
|
||||
from lerobot.datasets.video_utils import encode_video_frames, get_video_info
|
||||
from lerobot.scripts.lerobot_edit_dataset import (
|
||||
save_episode_images_for_video,
|
||||
_copy_data_without_images,
|
||||
)
|
||||
from lerobot.utils.constants import OBS_IMAGE
|
||||
|
||||
# Check that it's an image dataset
|
||||
if len(dataset.meta.video_keys) > 0:
|
||||
raise ValueError(
|
||||
f"This operation is for image datasets only. Video dataset provided: {dataset.repo_id}"
|
||||
)
|
||||
|
||||
# Get all image keys
|
||||
hf_dataset = dataset.hf_dataset.with_format(None)
|
||||
img_keys = [key for key in hf_dataset.features if key.startswith(OBS_IMAGE)]
|
||||
|
||||
if len(img_keys) == 0:
|
||||
raise ValueError(f"No image keys found in dataset {dataset.repo_id}")
|
||||
|
||||
logging.info(f"Converting {len(episode_indices)} episodes with {len(img_keys)} cameras")
|
||||
|
||||
# Create new features dict, converting image features to video features
|
||||
new_features = {}
|
||||
for key, value in dataset.meta.features.items():
|
||||
if key not in img_keys:
|
||||
new_features[key] = value
|
||||
else:
|
||||
# Convert image key to video format
|
||||
new_features[key] = value.copy()
|
||||
new_features[key]["dtype"] = "video"
|
||||
|
||||
# Create new metadata for video dataset
|
||||
new_meta = LeRobotDatasetMetadata.create(
|
||||
repo_id=shard_repo_id,
|
||||
fps=dataset.meta.fps,
|
||||
features=new_features,
|
||||
robot_type=dataset.meta.robot_type,
|
||||
root=shard_output_dir,
|
||||
use_videos=True,
|
||||
chunks_size=dataset.meta.chunks_size,
|
||||
data_files_size_in_mb=dataset.meta.data_files_size_in_mb,
|
||||
video_files_size_in_mb=dataset.meta.video_files_size_in_mb,
|
||||
)
|
||||
|
||||
# Create temporary directory for image extraction
|
||||
temp_dir = shard_output_dir / "temp_images"
|
||||
temp_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Process each episode with REMAPPED indices (0, 1, 2, ...)
|
||||
all_episode_metadata = []
|
||||
fps = int(dataset.fps)
|
||||
|
||||
try:
|
||||
for new_ep_idx, orig_ep_idx in enumerate(tqdm(episode_indices, desc="Converting episodes")):
|
||||
# Get episode metadata from source using ORIGINAL index
|
||||
src_episode = dataset.meta.episodes[orig_ep_idx]
|
||||
episode_length = src_episode["length"]
|
||||
episode_duration = episode_length / dataset.fps
|
||||
|
||||
video_metadata = {}
|
||||
|
||||
# Encode videos for each camera
|
||||
for img_key in img_keys:
|
||||
# Save images temporarily using ORIGINAL episode index
|
||||
imgs_dir = temp_dir / f"episode_{orig_ep_idx:06d}" / img_key
|
||||
save_episode_images_for_video(
|
||||
dataset, imgs_dir, img_key, orig_ep_idx, self.num_image_workers
|
||||
)
|
||||
|
||||
# Determine chunk and file indices using NEW (remapped) episode index
|
||||
chunk_idx = new_ep_idx // new_meta.chunks_size
|
||||
file_idx = new_ep_idx % new_meta.chunks_size
|
||||
|
||||
# Create video path in the new dataset structure
|
||||
video_path = new_meta.root / new_meta.video_path.format(
|
||||
video_key=img_key, chunk_index=chunk_idx, file_index=file_idx
|
||||
)
|
||||
video_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Encode video
|
||||
encode_video_frames(
|
||||
imgs_dir=imgs_dir,
|
||||
video_path=video_path,
|
||||
fps=fps,
|
||||
vcodec=self.vcodec,
|
||||
pix_fmt=self.pix_fmt,
|
||||
g=self.g,
|
||||
crf=self.crf,
|
||||
fast_decode=self.fast_decode,
|
||||
overwrite=True,
|
||||
)
|
||||
|
||||
# Clean up temporary images
|
||||
shutil.rmtree(imgs_dir)
|
||||
|
||||
# Store video metadata
|
||||
video_metadata[img_key] = {
|
||||
f"videos/{img_key}/chunk_index": chunk_idx,
|
||||
f"videos/{img_key}/file_index": file_idx,
|
||||
f"videos/{img_key}/from_timestamp": 0.0,
|
||||
f"videos/{img_key}/to_timestamp": episode_duration,
|
||||
}
|
||||
|
||||
# Build episode metadata using NEW index
|
||||
episode_meta = {
|
||||
"episode_index": new_ep_idx,
|
||||
"length": episode_length,
|
||||
"dataset_from_index": new_ep_idx * episode_length,
|
||||
"dataset_to_index": (new_ep_idx + 1) * episode_length,
|
||||
}
|
||||
|
||||
# Add video metadata
|
||||
for img_key in img_keys:
|
||||
episode_meta.update(video_metadata[img_key])
|
||||
|
||||
# Add data chunk/file info
|
||||
if "data/chunk_index" in src_episode:
|
||||
episode_meta["data/chunk_index"] = src_episode["data/chunk_index"]
|
||||
episode_meta["data/file_index"] = src_episode["data/file_index"]
|
||||
|
||||
all_episode_metadata.append(episode_meta)
|
||||
|
||||
# Copy and transform data files (removing image columns)
|
||||
_copy_data_without_images(dataset, new_meta, episode_indices, img_keys)
|
||||
|
||||
# Save episode metadata
|
||||
episodes_df = pd.DataFrame(all_episode_metadata)
|
||||
episodes_path = new_meta.root / "meta" / "episodes" / "chunk-000" / "file-000.parquet"
|
||||
episodes_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
episodes_df.to_parquet(episodes_path, index=False)
|
||||
|
||||
# Update metadata info
|
||||
new_meta.info["total_episodes"] = len(episode_indices)
|
||||
new_meta.info["total_frames"] = sum(ep["length"] for ep in all_episode_metadata)
|
||||
new_meta.info["total_tasks"] = dataset.meta.total_tasks
|
||||
new_meta.info["splits"] = {"train": f"0:{len(episode_indices)}"}
|
||||
|
||||
# Update video info for all image keys using the actual first video file
|
||||
for img_key in img_keys:
|
||||
if not new_meta.features[img_key].get("info", None):
|
||||
# Use the first actually created video file
|
||||
chunk_idx = all_episode_metadata[0][f"videos/{img_key}/chunk_index"]
|
||||
file_idx = all_episode_metadata[0][f"videos/{img_key}/file_index"]
|
||||
video_path = new_meta.root / new_meta.video_path.format(
|
||||
video_key=img_key, chunk_index=chunk_idx, file_index=file_idx
|
||||
)
|
||||
new_meta.info["features"][img_key]["info"] = get_video_info(video_path)
|
||||
|
||||
write_info(new_meta.info, new_meta.root)
|
||||
|
||||
# Copy stats and tasks
|
||||
if dataset.meta.stats is not None:
|
||||
# Remove image stats
|
||||
new_stats = {k: v for k, v in dataset.meta.stats.items() if k not in img_keys}
|
||||
write_stats(new_stats, new_meta.root)
|
||||
|
||||
if dataset.meta.tasks is not None:
|
||||
write_tasks(dataset.meta.tasks, new_meta.root)
|
||||
|
||||
finally:
|
||||
# Clean up temporary directory
|
||||
if temp_dir.exists():
|
||||
shutil.rmtree(temp_dir)
|
||||
|
||||
|
||||
def make_convert_executor(
|
||||
repo_id,
|
||||
root,
|
||||
output_dir,
|
||||
output_repo_id,
|
||||
vcodec,
|
||||
pix_fmt,
|
||||
g,
|
||||
crf,
|
||||
fast_decode,
|
||||
num_image_workers,
|
||||
job_name,
|
||||
logs_dir,
|
||||
workers,
|
||||
partition,
|
||||
cpus_per_task,
|
||||
mem_per_cpu,
|
||||
time_limit,
|
||||
slurm=True,
|
||||
):
|
||||
"""Create executor for parallel video conversion."""
|
||||
kwargs = {
|
||||
"pipeline": [
|
||||
ConvertEpisodesToVideo(
|
||||
repo_id=repo_id,
|
||||
root=root,
|
||||
output_dir=output_dir,
|
||||
output_repo_id=output_repo_id,
|
||||
vcodec=vcodec,
|
||||
pix_fmt=pix_fmt,
|
||||
g=g,
|
||||
crf=crf,
|
||||
fast_decode=fast_decode,
|
||||
num_image_workers=num_image_workers,
|
||||
),
|
||||
],
|
||||
"logging_dir": str(logs_dir / job_name),
|
||||
}
|
||||
|
||||
if slurm:
|
||||
kwargs.update(
|
||||
{
|
||||
"job_name": job_name,
|
||||
"tasks": workers, # Number of parallel tasks = number of workers
|
||||
"workers": workers,
|
||||
"time": time_limit,
|
||||
"partition": partition,
|
||||
"cpus_per_task": cpus_per_task,
|
||||
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
|
||||
}
|
||||
)
|
||||
executor = SlurmPipelineExecutor(**kwargs)
|
||||
else:
|
||||
kwargs.update(
|
||||
{
|
||||
"tasks": workers,
|
||||
"workers": 1, # Local mode: sequential
|
||||
}
|
||||
)
|
||||
executor = LocalPipelineExecutor(**kwargs)
|
||||
|
||||
return executor
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Parallelize video conversion across SLURM workers",
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
||||
)
|
||||
|
||||
# Input/Output paths
|
||||
parser.add_argument(
|
||||
"--repo-id",
|
||||
type=str,
|
||||
required=True,
|
||||
help="Repository ID of the source image dataset",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--root",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Root directory containing the source dataset (default: HF cache)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output-dir",
|
||||
type=str,
|
||||
required=True,
|
||||
help="Output directory for the video dataset",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output-repo-id",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Repository ID for output dataset (default: <repo_id>_video)",
|
||||
)
|
||||
|
||||
# Video encoding parameters
|
||||
parser.add_argument(
|
||||
"--vcodec",
|
||||
type=str,
|
||||
default="libsvtav1",
|
||||
help="Video codec",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pix-fmt",
|
||||
type=str,
|
||||
default="yuv420p",
|
||||
help="Pixel format",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--g",
|
||||
type=int,
|
||||
default=2,
|
||||
help="GOP size (group of pictures)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--crf",
|
||||
type=int,
|
||||
default=30,
|
||||
help="Constant rate factor (quality)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--fast-decode",
|
||||
type=int,
|
||||
default=0,
|
||||
help="Fast decode tuning",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--num-image-workers",
|
||||
type=int,
|
||||
default=4,
|
||||
help="Number of threads per worker for saving images",
|
||||
)
|
||||
|
||||
# SLURM parameters
|
||||
parser.add_argument(
|
||||
"--logs-dir",
|
||||
type=Path,
|
||||
required=True,
|
||||
help="Path to logs directory for datatrove",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--job-name",
|
||||
type=str,
|
||||
default="convert_to_video",
|
||||
help="Job name for SLURM",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--slurm",
|
||||
type=int,
|
||||
default=1,
|
||||
help="Launch over SLURM (1) or locally (0)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--workers",
|
||||
type=int,
|
||||
default=100,
|
||||
help="Number of parallel workers (each processes different episodes)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--partition",
|
||||
type=str,
|
||||
required=True,
|
||||
help="SLURM partition (use CPU partition)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cpus-per-task",
|
||||
type=int,
|
||||
default=8,
|
||||
help="Number of CPUs per SLURM task",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--mem-per-cpu",
|
||||
type=str,
|
||||
default="4G",
|
||||
help="Memory per CPU",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--time-limit",
|
||||
type=str,
|
||||
default="08:00:00",
|
||||
help="Time limit for SLURM job",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Convert slurm flag to boolean
|
||||
slurm = args.slurm == 1
|
||||
|
||||
# Create and run executor
|
||||
executor = make_convert_executor(
|
||||
repo_id=args.repo_id,
|
||||
root=args.root,
|
||||
output_dir=args.output_dir,
|
||||
output_repo_id=args.output_repo_id,
|
||||
vcodec=args.vcodec,
|
||||
pix_fmt=args.pix_fmt,
|
||||
g=args.g,
|
||||
crf=args.crf,
|
||||
fast_decode=args.fast_decode,
|
||||
num_image_workers=args.num_image_workers,
|
||||
job_name=args.job_name,
|
||||
logs_dir=args.logs_dir,
|
||||
workers=args.workers,
|
||||
partition=args.partition,
|
||||
cpus_per_task=args.cpus_per_task,
|
||||
mem_per_cpu=args.mem_per_cpu,
|
||||
time_limit=args.time_limit,
|
||||
slurm=slurm,
|
||||
)
|
||||
|
||||
logging.info(f"Starting video conversion with {args.workers} workers")
|
||||
executor.run()
|
||||
logging.info("All workers submitted/completed")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
==============================================
|
||||
STEP 1: Starting parallel video conversion
|
||||
Workers: 2
|
||||
Input: lerobot (root: /fsx/jade_choghari/vlabench-primitive/)
|
||||
Output: /fsx/jade_choghari/vlabench-primitive-encoded
|
||||
==============================================
|
||||
python: can't open file '/admin/home/jade_choghari/lerobot/slurm_convert_to_video.py': [Errno 2] No such file or directory
|
||||
@@ -81,14 +81,10 @@ class AdamWConfig(OptimizerConfig):
|
||||
eps: float = 1e-8
|
||||
weight_decay: float = 1e-2
|
||||
grad_clip_norm: float = 10.0
|
||||
fused: bool = False
|
||||
|
||||
def build(self, params: dict) -> torch.optim.Optimizer:
|
||||
kwargs = asdict(self)
|
||||
kwargs.pop("grad_clip_norm")
|
||||
# Fused optimizer only works on CUDA
|
||||
if kwargs.get("fused") and not torch.cuda.is_available():
|
||||
kwargs["fused"] = False
|
||||
return torch.optim.AdamW(params, **kwargs)
|
||||
|
||||
|
||||
|
||||
@@ -136,7 +136,6 @@ class ACTConfig(PreTrainedConfig):
|
||||
optimizer_lr: float = 1e-5
|
||||
optimizer_weight_decay: float = 1e-4
|
||||
optimizer_lr_backbone: float = 1e-5
|
||||
optimizer_fused: bool = False # Use CUDA fused AdamW kernel
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
@@ -165,7 +164,6 @@ class ACTConfig(PreTrainedConfig):
|
||||
return AdamWConfig(
|
||||
lr=self.optimizer_lr,
|
||||
weight_decay=self.optimizer_weight_decay,
|
||||
fused=self.optimizer_fused,
|
||||
)
|
||||
|
||||
def get_scheduler_preset(self) -> None:
|
||||
|
||||
@@ -94,7 +94,6 @@ class GrootConfig(PreTrainedConfig):
|
||||
optimizer_betas: tuple[float, float] = (0.95, 0.999)
|
||||
optimizer_eps: float = 1e-8
|
||||
optimizer_weight_decay: float = 1e-5
|
||||
optimizer_fused: bool = False # Use CUDA fused AdamW kernel
|
||||
warmup_ratio: float = 0.05
|
||||
use_bf16: bool = True
|
||||
|
||||
@@ -175,7 +174,6 @@ class GrootConfig(PreTrainedConfig):
|
||||
betas=self.optimizer_betas,
|
||||
eps=self.optimizer_eps,
|
||||
weight_decay=self.optimizer_weight_decay,
|
||||
fused=self.optimizer_fused,
|
||||
)
|
||||
|
||||
def get_scheduler_preset(self) -> CosineDecayWithWarmupSchedulerConfig:
|
||||
|
||||
@@ -23,8 +23,6 @@ from lerobot.optim.schedulers import CosineDecayWithWarmupSchedulerConfig
|
||||
from lerobot.policies.rtc.configuration_rtc import RTCConfig
|
||||
from lerobot.utils.constants import OBS_IMAGES
|
||||
|
||||
DEFAULT_IMAGE_SIZE = 224
|
||||
|
||||
|
||||
@PreTrainedConfig.register_subclass("pi0")
|
||||
@dataclass
|
||||
@@ -53,10 +51,7 @@ class PI0Config(PreTrainedConfig):
|
||||
# Real-Time Chunking (RTC) configuration
|
||||
rtc_config: RTCConfig | None = None
|
||||
|
||||
image_resolution: tuple[int, int] = (
|
||||
DEFAULT_IMAGE_SIZE,
|
||||
DEFAULT_IMAGE_SIZE,
|
||||
) # see openpi `preprocessing_pytorch.py`
|
||||
image_resolution: tuple[int, int] = (224, 224) # see openpi `preprocessing_pytorch.py`
|
||||
|
||||
# Add empty images. Used to add empty cameras when no image features are present.
|
||||
empty_cameras: int = 0
|
||||
@@ -74,7 +69,6 @@ class PI0Config(PreTrainedConfig):
|
||||
gradient_checkpointing: bool = False # Enable gradient checkpointing for memory optimization
|
||||
compile_model: bool = False # Whether to use torch.compile for model optimization
|
||||
compile_mode: str = "max-autotune" # Torch compile mode
|
||||
optimizer_fused: bool = False # Use CUDA fused AdamW kernel
|
||||
device: str | None = None # Device to use for the model (None = auto-detect)
|
||||
|
||||
# Optimizer settings: see openpi `AdamW``
|
||||
@@ -142,7 +136,6 @@ class PI0Config(PreTrainedConfig):
|
||||
eps=self.optimizer_eps,
|
||||
weight_decay=self.optimizer_weight_decay,
|
||||
grad_clip_norm=self.optimizer_grad_clip_norm,
|
||||
fused=self.optimizer_fused,
|
||||
)
|
||||
|
||||
def get_scheduler_preset(self):
|
||||
|
||||
@@ -41,7 +41,7 @@ else:
|
||||
PaliGemmaForConditionalGeneration = None
|
||||
|
||||
from lerobot.configs.policies import PreTrainedConfig
|
||||
from lerobot.policies.pi0.configuration_pi0 import DEFAULT_IMAGE_SIZE, PI0Config
|
||||
from lerobot.policies.pi0.configuration_pi0 import PI0Config
|
||||
from lerobot.policies.pretrained import PreTrainedPolicy, T
|
||||
from lerobot.policies.rtc.modeling_rtc import RTCProcessor
|
||||
from lerobot.utils.constants import (
|
||||
@@ -337,7 +337,6 @@ class PaliGemmaWithExpertModel(
|
||||
action_expert_config,
|
||||
use_adarms=None,
|
||||
precision: Literal["bfloat16", "float32"] = "bfloat16",
|
||||
image_size: int = DEFAULT_IMAGE_SIZE,
|
||||
):
|
||||
if use_adarms is None:
|
||||
use_adarms = [False, False]
|
||||
@@ -357,7 +356,6 @@ class PaliGemmaWithExpertModel(
|
||||
vlm_config_hf.text_config.vocab_size = 257152
|
||||
vlm_config_hf.text_config.use_adarms = use_adarms[0]
|
||||
vlm_config_hf.text_config.adarms_cond_dim = vlm_config.width if use_adarms[0] else None
|
||||
vlm_config_hf.vision_config.image_size = image_size
|
||||
vlm_config_hf.vision_config.intermediate_size = 4304
|
||||
vlm_config_hf.vision_config.projection_dim = 2048
|
||||
vlm_config_hf.vision_config.projector_hidden_act = "gelu_fast"
|
||||
@@ -521,17 +519,11 @@ class PI0Pytorch(nn.Module): # see openpi `PI0Pytorch`
|
||||
paligemma_config = get_gemma_config(config.paligemma_variant)
|
||||
action_expert_config = get_gemma_config(config.action_expert_variant)
|
||||
|
||||
if config.image_resolution[0] != config.image_resolution[1]:
|
||||
raise ValueError(
|
||||
f"PaliGemma expects square image resolution, invalid resolution: {config.image_resolution}"
|
||||
)
|
||||
|
||||
self.paligemma_with_expert = PaliGemmaWithExpertModel(
|
||||
paligemma_config,
|
||||
action_expert_config,
|
||||
use_adarms=[False, False],
|
||||
precision=config.dtype,
|
||||
image_size=config.image_resolution[0],
|
||||
)
|
||||
|
||||
self.action_in_proj = nn.Linear(config.max_action_dim, action_expert_config.width)
|
||||
@@ -820,13 +812,16 @@ class PI0Pytorch(nn.Module): # see openpi `PI0Pytorch`
|
||||
)
|
||||
|
||||
dt = -1.0 / num_steps
|
||||
dt = torch.tensor(dt, dtype=torch.float32, device=device)
|
||||
|
||||
x_t = noise
|
||||
for step in range(num_steps):
|
||||
time = 1.0 + step * dt
|
||||
time_tensor = torch.tensor(time, dtype=torch.float32, device=device).expand(bsize)
|
||||
time = torch.tensor(1.0, dtype=torch.float32, device=device)
|
||||
while time >= -dt / 2:
|
||||
expanded_time = time.expand(bsize)
|
||||
|
||||
def denoise_step_partial_call(input_x_t, current_timestep=time_tensor):
|
||||
# Define a closure function to properly capture expanded_time
|
||||
# This avoids the lambda expression (E731) and loop variable binding (B023) issues
|
||||
def denoise_step_partial_call(input_x_t, current_timestep=expanded_time):
|
||||
return self.denoise_step(
|
||||
state=state,
|
||||
prefix_pad_masks=prefix_pad_masks,
|
||||
@@ -851,11 +846,15 @@ class PI0Pytorch(nn.Module): # see openpi `PI0Pytorch`
|
||||
else:
|
||||
v_t = denoise_step_partial_call(x_t)
|
||||
|
||||
x_t = x_t + dt * v_t
|
||||
# Euler step
|
||||
x_t += dt * v_t
|
||||
|
||||
# Record x_t and v_t after Euler step
|
||||
if self.rtc_processor is not None and self.rtc_processor.is_debug_enabled():
|
||||
self.rtc_processor.track(time=time, x_t=x_t, v_t=v_t)
|
||||
|
||||
time += dt
|
||||
|
||||
return x_t
|
||||
|
||||
def denoise_step(
|
||||
|
||||
@@ -22,8 +22,6 @@ from lerobot.optim.optimizers import AdamWConfig
|
||||
from lerobot.optim.schedulers import CosineDecayWithWarmupSchedulerConfig
|
||||
from lerobot.policies.rtc.configuration_rtc import RTCConfig
|
||||
|
||||
DEFAULT_IMAGE_SIZE = 224
|
||||
|
||||
|
||||
@PreTrainedConfig.register_subclass("pi05")
|
||||
@dataclass
|
||||
@@ -52,10 +50,7 @@ class PI05Config(PreTrainedConfig):
|
||||
# Real-Time Chunking (RTC) configuration
|
||||
rtc_config: RTCConfig | None = None
|
||||
|
||||
image_resolution: tuple[int, int] = (
|
||||
DEFAULT_IMAGE_SIZE,
|
||||
DEFAULT_IMAGE_SIZE,
|
||||
) # see openpi `preprocessing_pytorch.py`
|
||||
image_resolution: tuple[int, int] = (224, 224) # see openpi `preprocessing_pytorch.py`
|
||||
|
||||
# Add empty images. Used to add empty cameras when no image features are present.
|
||||
empty_cameras: int = 0
|
||||
@@ -74,7 +69,6 @@ class PI05Config(PreTrainedConfig):
|
||||
gradient_checkpointing: bool = False # Enable gradient checkpointing for memory optimization
|
||||
compile_model: bool = False # Whether to use torch.compile for model optimization
|
||||
compile_mode: str = "max-autotune" # Torch compile mode
|
||||
optimizer_fused: bool = False # Use CUDA fused AdamW kernel
|
||||
device: str | None = None # Device to use for the model (None = auto-detect)
|
||||
|
||||
# Optimizer settings: see openpi `AdamW`
|
||||
@@ -142,7 +136,6 @@ class PI05Config(PreTrainedConfig):
|
||||
eps=self.optimizer_eps,
|
||||
weight_decay=self.optimizer_weight_decay,
|
||||
grad_clip_norm=self.optimizer_grad_clip_norm,
|
||||
fused=self.optimizer_fused,
|
||||
)
|
||||
|
||||
def get_scheduler_preset(self):
|
||||
|
||||
@@ -41,7 +41,7 @@ else:
|
||||
PaliGemmaForConditionalGeneration = None
|
||||
|
||||
from lerobot.configs.policies import PreTrainedConfig
|
||||
from lerobot.policies.pi05.configuration_pi05 import DEFAULT_IMAGE_SIZE, PI05Config
|
||||
from lerobot.policies.pi05.configuration_pi05 import PI05Config
|
||||
from lerobot.policies.pretrained import PreTrainedPolicy, T
|
||||
from lerobot.policies.rtc.modeling_rtc import RTCProcessor
|
||||
from lerobot.utils.constants import (
|
||||
@@ -336,7 +336,6 @@ class PaliGemmaWithExpertModel(
|
||||
action_expert_config,
|
||||
use_adarms=None,
|
||||
precision: Literal["bfloat16", "float32"] = "bfloat16",
|
||||
image_size: int = DEFAULT_IMAGE_SIZE,
|
||||
):
|
||||
if use_adarms is None:
|
||||
use_adarms = [False, False]
|
||||
@@ -356,7 +355,6 @@ class PaliGemmaWithExpertModel(
|
||||
vlm_config_hf.text_config.vocab_size = 257152
|
||||
vlm_config_hf.text_config.use_adarms = use_adarms[0]
|
||||
vlm_config_hf.text_config.adarms_cond_dim = vlm_config.width if use_adarms[0] else None
|
||||
vlm_config_hf.vision_config.image_size = image_size
|
||||
vlm_config_hf.vision_config.intermediate_size = 4304
|
||||
vlm_config_hf.vision_config.projection_dim = 2048
|
||||
vlm_config_hf.vision_config.projector_hidden_act = "gelu_fast"
|
||||
@@ -520,17 +518,11 @@ class PI05Pytorch(nn.Module): # see openpi `PI0Pytorch`
|
||||
paligemma_config = get_gemma_config(config.paligemma_variant)
|
||||
action_expert_config = get_gemma_config(config.action_expert_variant)
|
||||
|
||||
if config.image_resolution[0] != config.image_resolution[1]:
|
||||
raise ValueError(
|
||||
f"PaliGemma expects square image resolution, invalid resolution: {config.image_resolution}"
|
||||
)
|
||||
|
||||
self.paligemma_with_expert = PaliGemmaWithExpertModel(
|
||||
paligemma_config,
|
||||
action_expert_config,
|
||||
use_adarms=[False, True],
|
||||
precision=config.dtype,
|
||||
image_size=config.image_resolution[0],
|
||||
)
|
||||
|
||||
self.action_in_proj = nn.Linear(config.max_action_dim, action_expert_config.width)
|
||||
@@ -795,13 +787,16 @@ class PI05Pytorch(nn.Module): # see openpi `PI0Pytorch`
|
||||
)
|
||||
|
||||
dt = -1.0 / num_steps
|
||||
dt = torch.tensor(dt, dtype=torch.float32, device=device)
|
||||
|
||||
x_t = noise
|
||||
for step in range(num_steps):
|
||||
time = 1.0 + step * dt
|
||||
time_tensor = torch.tensor(time, dtype=torch.float32, device=device).expand(bsize)
|
||||
time = torch.tensor(1.0, dtype=torch.float32, device=device)
|
||||
while time >= -dt / 2:
|
||||
expanded_time = time.expand(bsize)
|
||||
|
||||
def denoise_step_partial_call(input_x_t, current_timestep=time_tensor):
|
||||
# Define a closure function to properly capture expanded_time
|
||||
# This avoids the lambda expression (E731) and loop variable binding (B023) issues
|
||||
def denoise_step_partial_call(input_x_t, current_timestep=expanded_time):
|
||||
return self.denoise_step(
|
||||
prefix_pad_masks=prefix_pad_masks,
|
||||
past_key_values=past_key_values,
|
||||
@@ -825,11 +820,15 @@ class PI05Pytorch(nn.Module): # see openpi `PI0Pytorch`
|
||||
else:
|
||||
v_t = denoise_step_partial_call(x_t)
|
||||
|
||||
x_t = x_t + dt * v_t
|
||||
# Euler step
|
||||
x_t += dt * v_t
|
||||
|
||||
# Record x_t and v_t after Euler step
|
||||
if self.rtc_processor is not None and self.rtc_processor.is_debug_enabled():
|
||||
self.rtc_processor.track(time=time, x_t=x_t, v_t=v_t)
|
||||
|
||||
time += dt
|
||||
|
||||
return x_t
|
||||
|
||||
def denoise_step(
|
||||
|
||||
@@ -79,7 +79,6 @@ class SmolVLAConfig(PreTrainedConfig):
|
||||
optimizer_eps: float = 1e-8
|
||||
optimizer_weight_decay: float = 1e-10
|
||||
optimizer_grad_clip_norm: float = 10
|
||||
optimizer_fused: bool = False
|
||||
|
||||
scheduler_warmup_steps: int = 1_000
|
||||
scheduler_decay_steps: int = 30_000
|
||||
@@ -137,7 +136,6 @@ class SmolVLAConfig(PreTrainedConfig):
|
||||
eps=self.optimizer_eps,
|
||||
weight_decay=self.optimizer_weight_decay,
|
||||
grad_clip_norm=self.optimizer_grad_clip_norm,
|
||||
fused=self.optimizer_fused,
|
||||
)
|
||||
|
||||
def get_scheduler_preset(self):
|
||||
|
||||
@@ -783,15 +783,18 @@ class VLAFlowMatching(nn.Module):
|
||||
use_cache=self.config.use_cache,
|
||||
fill_kv_cache=True,
|
||||
)
|
||||
num_steps = self.config.num_steps
|
||||
dt = -1.0 / num_steps
|
||||
dt = -1.0 / self.config.num_steps
|
||||
dt = torch.tensor(dt, dtype=torch.float32, device=device)
|
||||
|
||||
x_t = noise
|
||||
for step in range(num_steps):
|
||||
time = 1.0 + step * dt
|
||||
time_tensor = torch.tensor(time, dtype=torch.float32, device=device).expand(bsize)
|
||||
time = torch.tensor(1.0, dtype=torch.float32, device=device)
|
||||
|
||||
def denoise_step_partial_call(input_x_t, current_timestep=time_tensor):
|
||||
while time >= -dt / 2:
|
||||
expanded_time = time.expand(bsize)
|
||||
|
||||
# Define a closure function to properly capture expanded_time
|
||||
# This avoids the lambda expression (E731) and loop variable binding (B023) issues
|
||||
def denoise_step_partial_call(input_x_t, current_timestep=expanded_time):
|
||||
return self.denoise_step(
|
||||
x_t=input_x_t,
|
||||
prefix_pad_masks=prefix_pad_masks,
|
||||
@@ -815,11 +818,15 @@ class VLAFlowMatching(nn.Module):
|
||||
else:
|
||||
v_t = denoise_step_partial_call(x_t)
|
||||
|
||||
x_t = x_t + dt * v_t
|
||||
# Euler step
|
||||
x_t += dt * v_t
|
||||
|
||||
# Record x_t and v_t after Euler step (other params are recorded in rtc_processor.denoise_step)
|
||||
if self.rtc_processor is not None and self.rtc_processor.is_debug_enabled():
|
||||
self.rtc_processor.track(time=time, x_t=x_t, v_t=v_t)
|
||||
|
||||
time += dt
|
||||
|
||||
return x_t
|
||||
|
||||
def denoise_step(
|
||||
|
||||
Reference in New Issue
Block a user