From 4cbd91a04e5192c1085d6aac3bd259a68b3e5929 Mon Sep 17 00:00:00 2001 From: pepijn223 Date: Thu, 4 Jun 2026 20:05:25 +0200 Subject: [PATCH] chore: drop one-off bench/build/train scripts from the PR Remove development-only tooling that doesn't belong in the PR: - examples/benchmark/* (pi052 step/kernel benchmark slurm + harness) - examples/port_datasets/slurm_build_robocasa_composite_seen.py and src/lerobot/scripts/build_robocasa_composite_seen.py (composite_seen dataset build scripts) - scripts/build_episode_filter.py, scripts/build_robocasa_smoke.sh, scripts/train_pi052_human300_exclude_unannotated.sh None are imported by the library, tests, or entry points. Co-authored-by: Cursor --- examples/benchmark/bench_pi052_kernels.slurm | 74 -- examples/benchmark/bench_pi052_step.py | 338 ------ examples/benchmark/bench_pi052_step.slurm | 36 - examples/benchmark/bench_pi052_step_v2.slurm | 39 - examples/benchmark/bench_pi052_step_v3.slurm | 36 - examples/benchmark/bench_pi052_step_v4.slurm | 41 - examples/benchmark/bench_pi052_step_v5.slurm | 33 - examples/benchmark/bench_pi052_step_v6.slurm | 31 - examples/benchmark/bench_pi052_step_v7.slurm | 39 - examples/benchmark/bench_pi052_step_v8.slurm | 36 - examples/benchmark/fsdp_pi052.yaml | 29 - .../slurm_build_robocasa_composite_seen.py | 1053 ----------------- scripts/build_episode_filter.py | 162 --- scripts/build_robocasa_smoke.sh | 47 - ...rain_pi052_human300_exclude_unannotated.sh | 115 -- .../scripts/build_robocasa_composite_seen.py | 345 ------ 16 files changed, 2454 deletions(-) delete mode 100644 examples/benchmark/bench_pi052_kernels.slurm delete mode 100644 examples/benchmark/bench_pi052_step.py delete mode 100644 examples/benchmark/bench_pi052_step.slurm delete mode 100644 examples/benchmark/bench_pi052_step_v2.slurm delete mode 100644 examples/benchmark/bench_pi052_step_v3.slurm delete mode 100644 examples/benchmark/bench_pi052_step_v4.slurm delete mode 100644 examples/benchmark/bench_pi052_step_v5.slurm delete mode 100644 examples/benchmark/bench_pi052_step_v6.slurm delete mode 100644 examples/benchmark/bench_pi052_step_v7.slurm delete mode 100644 examples/benchmark/bench_pi052_step_v8.slurm delete mode 100644 examples/benchmark/fsdp_pi052.yaml delete mode 100644 examples/port_datasets/slurm_build_robocasa_composite_seen.py delete mode 100644 scripts/build_episode_filter.py delete mode 100755 scripts/build_robocasa_smoke.sh delete mode 100755 scripts/train_pi052_human300_exclude_unannotated.sh delete mode 100644 src/lerobot/scripts/build_robocasa_composite_seen.py diff --git a/examples/benchmark/bench_pi052_kernels.slurm b/examples/benchmark/bench_pi052_kernels.slurm deleted file mode 100644 index 046ed7dfe..000000000 --- a/examples/benchmark/bench_pi052_kernels.slurm +++ /dev/null @@ -1,74 +0,0 @@ -#!/bin/bash -#SBATCH --job-name=bench-pi052-kernels -#SBATCH --partition=hopper-prod -#SBATCH --qos=high -#SBATCH --time=01:30:00 -#SBATCH --ntasks=1 -#SBATCH --gpus-per-task=1 -#SBATCH --output=/fsx/pepijn/logs/bench_pi052_kernels_%j.out - -# HF kernels exploration via Liger's apply_liger_kernel_to_paligemma. -# Baseline (SDPA, no kernels) vs. per-subkernel ablations vs. all-on. -# Same harness as bench_pi052_step.py — only the --kernels flag varies -# across runs so any delta is attributable to the patched op(s). -# -# Subkernels exercised: rope, rms_norm, geglu, layer_norm. -# Skipped: cross_entropy / fused_linear_cross_entropy — pi052 calls -# F.cross_entropy directly and bypasses PaliGemma's forward, so those -# patches wouldn't fire without model-code changes (separate PR). - -set -euo pipefail - -cd "${LEROBOT_ROOT:-$HOME/lerobot}" - -export PATH="$HOME/miniconda3/bin:$HOME/.local/bin:$PATH" -export LD_LIBRARY_PATH="$HOME/miniconda3/lib:${LD_LIBRARY_PATH:-}" -export PYTORCH_CUDA_ALLOC_CONF="${PYTORCH_CUDA_ALLOC_CONF:-expandable_segments:True}" - -# /fsx triton cache is shared across nodes with different glibc versions -# — kernels built on one node trip GLIBC_2.34-not-found on another. Use -# a node-local cache per job to side-step that. -export TRITON_CACHE_DIR="/tmp/triton_${SLURM_JOB_ID}" -export TORCHINDUCTOR_CACHE_DIR="/tmp/torchinductor_${SLURM_JOB_ID}" -mkdir -p "$TRITON_CACHE_DIR" "$TORCHINDUCTOR_CACHE_DIR" - -echo "=== Node: $(hostname) ===" -nvidia-smi --query-gpu=name,driver_version,memory.total --format=csv,noheader -ldd --version | head -1 - -# Liger isn't in our standard env yet — install on the compute node so -# the slurm log captures the exact version that produced the numbers. -python -m pip install -q --upgrade 'liger-kernel' -python - <<'PY' || true -from importlib.metadata import version, PackageNotFoundError -try: - print("liger-kernel", version("liger-kernel")) -except PackageNotFoundError: - print("liger-kernel: not importable") -import liger_kernel.transformers as t -print("apply_liger_kernel_to_paligemma:", hasattr(t, "apply_liger_kernel_to_paligemma")) -PY - -run() { - echo - echo "--- $* ---" - python examples/benchmark/bench_pi052_step.py "$@" || true -} - -# -- Baseline (no kernels) at the BS we actually train at. -- -run --attn sdpa --batch-size 8 --kernels none -run --attn sdpa --batch-size 16 --kernels none - -# -- Per-subkernel ablations at BS=16 to isolate each contributor. -- -run --attn sdpa --batch-size 16 --kernels rms_norm -run --attn sdpa --batch-size 16 --kernels geglu -run --attn sdpa --batch-size 16 --kernels layer_norm -run --attn sdpa --batch-size 16 --kernels rope - -# -- All-on, both BS to compare against the matched baselines above. -- -run --attn sdpa --batch-size 8 --kernels all -run --attn sdpa --batch-size 16 --kernels all - -# -- Headroom check: does kernels-all let BS=24 fit (baseline OOMs near here)? -- -run --attn sdpa --batch-size 24 --kernels none -run --attn sdpa --batch-size 24 --kernels all diff --git a/examples/benchmark/bench_pi052_step.py b/examples/benchmark/bench_pi052_step.py deleted file mode 100644 index 00560d54b..000000000 --- a/examples/benchmark/bench_pi052_step.py +++ /dev/null @@ -1,338 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2026 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Benchmark ``PI052Policy.forward + backward`` on a single GPU. - -Compares the new SDPA attention path against the eager baseline by -monkeypatching ``sdpa_attention_forward`` before the first model -forward — so both runs share identical Q/K/V plumbing and only the -attention kernel differs. Reports steps/sec and peak GPU memory. - -SLURM-only: - - sbatch examples/benchmark/bench_pi052_step.slurm - -Or one-off: - - srun --partition=hopper-prod --qos=high --gpus=1 --time=15 \\ - python examples/benchmark/bench_pi052_step.py --attn sdpa --batch-size 8 -""" - -from __future__ import annotations - -import argparse -import gc -import math -import os -import time - -import torch - - -def _maybe_patch_eager() -> None: - """Swap ``sdpa_attention_forward`` for the original eager forward. - - Must be called BEFORE PI052Policy is instantiated — the layer - compute functions resolve the symbol at call time (module-level - lookup), so this patch covers both pi05 and pi052 KI paths.""" - from transformers.models.gemma import modeling_gemma - - from lerobot.policies.pi05 import modeling_pi05 - - modeling_pi05.sdpa_attention_forward = modeling_gemma.eager_attention_forward - - -_LIGER_SUBKERNELS = ("rope", "rms_norm", "geglu", "layer_norm") - - -def _maybe_patch_liger(spec: str) -> dict: - """Globally patch PaliGemma/Gemma/Siglip modules with Liger Triton kernels. - - Must be called BEFORE PI052Policy is instantiated — Liger replaces - classes inside ``transformers.models.{gemma,gemma2,siglip,paligemma}``, - so any model built after the call picks up the fused forwards. - - ``spec`` is a comma-separated subset of {rope, rms_norm, geglu, - layer_norm} (also ``all`` and ``none``). ``cross_entropy`` and - ``fused_linear_cross_entropy`` are intentionally skipped — pi052's - losses use ``F.cross_entropy`` directly (not ``nn.CrossEntropyLoss``) - and never traverse ``PaliGemmaForConditionalGeneration.forward``, - so neither patch would fire without invasive model-code changes. - """ - enabled = dict.fromkeys(_LIGER_SUBKERNELS, False) - if spec in ("", "none"): - return enabled - tokens = [t.strip() for t in spec.split(",") if t.strip()] - if tokens == ["all"]: - enabled = dict.fromkeys(_LIGER_SUBKERNELS, True) - else: - for t in tokens: - if t not in enabled: - raise SystemExit(f"Unknown liger subkernel: {t!r}. Choose from {_LIGER_SUBKERNELS} or 'all'.") - enabled[t] = True - - from liger_kernel.transformers import apply_liger_kernel_to_paligemma - - apply_liger_kernel_to_paligemma( - rope=enabled["rope"], - rms_norm=enabled["rms_norm"], - geglu=enabled["geglu"], - layer_norm=enabled["layer_norm"], - cross_entropy=False, - fused_linear_cross_entropy=False, - ) - return enabled - - -def _maybe_patch_flex() -> None: - """Swap ``sdpa_attention_forward`` for a FlexAttention-backed forward. - - Experimental: builds a per-call ``score_mod`` from the additive - mask and dispatches to a compiled ``flex_attention`` kernel. - - Known issue on torch 2.7.1: dynamo errors out with - ``FlexAttentionHigherOrderVariable() has no type`` when the - ``score_mod`` closure captures a per-call bias tensor. A proper - port needs ``create_block_mask(mask_mod, ...)`` plumbed at the - PI05Pytorch.forward level so a BlockMask object can be passed - down to the layer compute, not a per-call closure. Left as - future work; keep this stub for benchmark experimentation.""" - import torch - from torch.nn.attention.flex_attention import flex_attention - - from lerobot.policies.pi05 import modeling_pi05 - - compiled_flex = torch.compile(flex_attention, dynamic=True) - - def flex_forward(module, query, key, value, attention_mask, scaling, dropout=0.0): - n_rep = module.num_key_value_groups - if n_rep > 1: - key = key.repeat_interleave(n_rep, dim=1) - value = value.repeat_interleave(n_rep, dim=1) - - bias = attention_mask # (B, 1, Lq, Lk) additive - - def score_mod(score, b, h, q_idx, kv_idx): - return score + bias[b, 0, q_idx, kv_idx] - - attn_output = compiled_flex(query, key, value, score_mod=score_mod, scale=scaling) - return attn_output.transpose(1, 2).contiguous(), None - - modeling_pi05.sdpa_attention_forward = flex_forward - - -def _build_policy(args, device: torch.device): - """Random-init PI052Policy at production-relevant shapes.""" - from lerobot.configs.types import FeatureType, PolicyFeature - from lerobot.policies.pi052.configuration_pi052 import PI052Config - from lerobot.policies.pi052.modeling_pi052 import PI052Policy - - # Production has ``unfreeze_lm_head=True`` + ``text_loss_weight>0``, - # which flips ``train_expert_only=False`` in __post_init__ and - # makes the whole PaliGemma + Gemma-expert stack trainable. We - # mirror that here so the optimizer-state count reflects reality; - # the loss path still goes through ``PI05Policy.forward`` because - # ``text_labels`` / FAST tokens are absent from the synthetic batch - # (see ``PI052Policy.forward`` early-return). - config = PI052Config( - max_action_dim=args.action_dim, - max_state_dim=args.state_dim, - dtype=args.dtype, - knowledge_insulation=args.knowledge_insulation, - text_loss_weight=1e-3 if args.train_full else 0.0, - flow_loss_weight=1.0, - enable_fast_action_loss=False, - unfreeze_lm_head=args.train_full, - tokenizer_max_length=args.lang_tokens, - device="cuda", - compile_model=args.compile_model, - compile_mode=args.compile_mode, - ) - config.input_features = { - "observation.state": PolicyFeature(type=FeatureType.STATE, shape=(args.state_dim,)), - "observation.images.base_0_rgb": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)), - } - config.output_features = { - "action": PolicyFeature(type=FeatureType.ACTION, shape=(args.action_dim,)), - } - policy = PI052Policy(config) - policy.to(device) - if args.gradient_checkpointing: - policy.model.gradient_checkpointing_enable() - policy.train() - return policy, config - - -def _build_batch(args, config, device: torch.device) -> dict: - """Synthetic batch matching the training-loop input contract.""" - from lerobot.utils.constants import ( - ACTION, - OBS_LANGUAGE_ATTENTION_MASK, - OBS_LANGUAGE_TOKENS, - ) - - B = args.batch_size - L = args.lang_tokens - return { - OBS_LANGUAGE_TOKENS: torch.randint(0, 250000, (B, L), device=device), - OBS_LANGUAGE_ATTENTION_MASK: torch.ones(B, L, dtype=torch.bool, device=device), - "observation.images.base_0_rgb": torch.rand(B, 3, 224, 224, device=device), - "observation.images.base_0_rgb_padding_mask": torch.ones(B, dtype=torch.bool, device=device), - "observation.state": torch.randn(B, args.state_dim, device=device), - ACTION: torch.randn(B, config.chunk_size, args.action_dim, device=device), - "action_is_pad": torch.zeros(B, config.chunk_size, dtype=torch.bool, device=device), - "task": ["bench task"] * B, - } - - -def _step(policy, batch, optimizer=None) -> torch.Tensor: - loss, _ = policy.forward(batch) - loss.backward() - if optimizer is not None: - optimizer.step() - optimizer.zero_grad(set_to_none=True) - else: - for p in policy.parameters(): - if p.grad is not None: - p.grad = None - return loss.detach() - - -def main() -> int: - parser = argparse.ArgumentParser() - parser.add_argument("--attn", choices=["sdpa", "eager", "flex"], default="sdpa") - parser.add_argument( - "--kernels", - default="none", - help=( - "Liger sub-kernels to enable, comma-separated. Choose from " - f"{_LIGER_SUBKERNELS} or use 'all' / 'none' (default). Applied " - "via apply_liger_kernel_to_paligemma() BEFORE model build." - ), - ) - parser.add_argument( - "--compile", - dest="compile_model", - action="store_true", - help="Set policy.config.compile_model=True (torch.compile the forward).", - ) - parser.add_argument( - "--compile-mode", - default="default", - help="torch.compile mode (default | reduce-overhead | max-autotune).", - ) - parser.add_argument("--batch-size", type=int, default=8) - parser.add_argument("--warmup", type=int, default=8) - parser.add_argument("--steps", type=int, default=40) - parser.add_argument("--lang-tokens", type=int, default=512) - parser.add_argument("--dtype", choices=["bfloat16", "float32"], default="bfloat16") - parser.add_argument("--action-dim", type=int, default=14) - parser.add_argument("--state-dim", type=int, default=14) - parser.add_argument("--knowledge-insulation", action="store_true", default=True) - parser.add_argument( - "--gradient-checkpointing", - dest="gradient_checkpointing", - action=argparse.BooleanOptionalAction, - default=True, - ) - parser.add_argument( - "--optimizer", - choices=["none", "adamw", "adamw_fused"], - default="adamw_fused", - help=( - "Whether to include an AdamW step in the timed iteration. " - "'none' mirrors the fwd+bwd-only original bench; 'adamw' / " - "'adamw_fused' add the realistic ~2x param-bytes optimizer " - "state and ``optimizer.step()`` cost." - ), - ) - parser.add_argument( - "--train-full", - action=argparse.BooleanOptionalAction, - default=True, - help=( - "Mirror production: unfreeze the PaliGemma backbone (full " - "~3B trainable params) instead of training only the 300M " - "action expert." - ), - ) - args = parser.parse_args() - - if not torch.cuda.is_available(): - raise SystemExit("Benchmark requires CUDA; submit via slurm (srun/sbatch).") - - if args.attn == "eager": - _maybe_patch_eager() - elif args.attn == "flex": - _maybe_patch_flex() - - liger_flags = _maybe_patch_liger(args.kernels) - - device = torch.device("cuda") - torch.cuda.reset_peak_memory_stats() - - policy, config = _build_policy(args, device) - batch = _build_batch(args, config, device) - - optimizer = None - trainable_params = sum(p.numel() for p in policy.parameters() if p.requires_grad) - if args.optimizer != "none": - trainable = [p for p in policy.parameters() if p.requires_grad] - optimizer = torch.optim.AdamW( - trainable, lr=5e-5, fused=(args.optimizer == "adamw_fused") - ) - - for _ in range(args.warmup): - _step(policy, batch, optimizer) - torch.cuda.synchronize() - - starter = torch.cuda.Event(enable_timing=True) - ender = torch.cuda.Event(enable_timing=True) - starter.record() - for _ in range(args.steps): - _step(policy, batch, optimizer) - ender.record() - torch.cuda.synchronize() - total_ms = starter.elapsed_time(ender) - step_ms = total_ms / args.steps - peak_gb = torch.cuda.max_memory_allocated() / (1024**3) - optim_gb = 0.0 - if optimizer is not None: - for st in optimizer.state.values(): - for v in st.values(): - if torch.is_tensor(v): - optim_gb += v.numel() * v.element_size() / (1024**3) - - liger_on = ",".join(k for k, v in liger_flags.items() if v) or "none" - name = ( - f"{args.attn:>5} | BS={args.batch_size} | L={args.lang_tokens} | " - f"KI={args.knowledge_insulation} | GC={args.gradient_checkpointing} | " - f"compile={args.compile_model} | liger={liger_on} | opt={args.optimizer} | dtype={args.dtype}" - ) - print( - f"{name}\n step_ms={step_ms:.1f} steps/sec={1000.0 / step_ms:.3f} " - f"peak_mem={peak_gb:.2f} GiB optim_state={optim_gb:.2f} GiB " - f"trainable_params={trainable_params / 1e9:.2f}B" - ) - - del policy, batch - gc.collect() - torch.cuda.empty_cache() - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/examples/benchmark/bench_pi052_step.slurm b/examples/benchmark/bench_pi052_step.slurm deleted file mode 100644 index 85b3b6063..000000000 --- a/examples/benchmark/bench_pi052_step.slurm +++ /dev/null @@ -1,36 +0,0 @@ -#!/bin/bash -#SBATCH --job-name=bench-pi052-attn -#SBATCH --partition=hopper-prod -#SBATCH --qos=high -#SBATCH --time=00:30:00 -#SBATCH --ntasks=1 -#SBATCH --gpus-per-task=1 -#SBATCH --output=/fsx/pepijn/logs/bench_pi052_%j.out - -set -euo pipefail - -cd "${LEROBOT_ROOT:-$HOME/lerobot}" - -export PATH="$HOME/miniconda3/bin:$HOME/.local/bin:$PATH" -export LD_LIBRARY_PATH="$HOME/miniconda3/lib:${LD_LIBRARY_PATH:-}" -export PYTORCH_CUDA_ALLOC_CONF="${PYTORCH_CUDA_ALLOC_CONF:-expandable_segments:True}" - -echo "=== Node: $(hostname) ===" -nvidia-smi --query-gpu=name,driver_version,memory.total --format=csv,noheader - -python -c "import torch; print('torch', torch.__version__, 'cuda', torch.version.cuda)" - -run() { - echo - echo "--- $* ---" - python examples/benchmark/bench_pi052_step.py "$@" || true -} - -# Attention parity benchmark — same shapes, different attention kernel. -run --attn eager --batch-size 8 -run --attn sdpa --batch-size 8 - -# Headroom benchmark — does SDPA's memory cut allow a bigger micro-batch? -run --attn sdpa --batch-size 12 -run --attn sdpa --batch-size 16 -run --attn sdpa --batch-size 24 diff --git a/examples/benchmark/bench_pi052_step_v2.slurm b/examples/benchmark/bench_pi052_step_v2.slurm deleted file mode 100644 index 839286bd5..000000000 --- a/examples/benchmark/bench_pi052_step_v2.slurm +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash -#SBATCH --job-name=bench-pi052-v2 -#SBATCH --partition=hopper-prod -#SBATCH --qos=high -#SBATCH --time=00:45:00 -#SBATCH --ntasks=1 -#SBATCH --gpus-per-task=1 -#SBATCH --output=/fsx/pepijn/logs/bench_pi052_v2_%j.out - -set -euo pipefail - -cd "${LEROBOT_ROOT:-$HOME/lerobot}" - -export PATH="$HOME/miniconda3/bin:$HOME/.local/bin:$PATH" -export LD_LIBRARY_PATH="$HOME/miniconda3/lib:${LD_LIBRARY_PATH:-}" -export PYTORCH_CUDA_ALLOC_CONF="${PYTORCH_CUDA_ALLOC_CONF:-expandable_segments:True}" - -echo "=== Node: $(hostname) ===" -nvidia-smi --query-gpu=name,driver_version,memory.total --format=csv,noheader - -run() { - echo - echo "--- $* ---" - python examples/benchmark/bench_pi052_step.py "$@" || true -} - -# A: GC ON — see if the selective-AC change (one less recompute level) -# narrows the eager vs SDPA gap at BS=8. -run --attn eager --batch-size 8 -run --attn sdpa --batch-size 8 - -# B: GC OFF — isolate the raw attention-kernel cost & memory delta. -run --attn eager --batch-size 4 --no-gradient-checkpointing -run --attn sdpa --batch-size 4 --no-gradient-checkpointing - -# C: SDPA + GC headroom sweep — where does it OOM? -run --attn sdpa --batch-size 16 -run --attn sdpa --batch-size 24 -run --attn sdpa --batch-size 32 diff --git a/examples/benchmark/bench_pi052_step_v3.slurm b/examples/benchmark/bench_pi052_step_v3.slurm deleted file mode 100644 index 2cd426a05..000000000 --- a/examples/benchmark/bench_pi052_step_v3.slurm +++ /dev/null @@ -1,36 +0,0 @@ -#!/bin/bash -#SBATCH --job-name=bench-pi052-v3 -#SBATCH --partition=hopper-prod -#SBATCH --qos=high -#SBATCH --time=00:45:00 -#SBATCH --ntasks=1 -#SBATCH --gpus-per-task=1 -#SBATCH --output=/fsx/pepijn/logs/bench_pi052_v3_%j.out - -set -euo pipefail - -cd "${LEROBOT_ROOT:-$HOME/lerobot}" - -export PATH="$HOME/miniconda3/bin:$HOME/.local/bin:$PATH" -export LD_LIBRARY_PATH="$HOME/miniconda3/lib:${LD_LIBRARY_PATH:-}" -export PYTORCH_CUDA_ALLOC_CONF="${PYTORCH_CUDA_ALLOC_CONF:-expandable_segments:True}" - -echo "=== Node: $(hostname) ===" -nvidia-smi --query-gpu=name,driver_version,memory.total --format=csv,noheader - -run() { - echo - echo "--- $* ---" - python examples/benchmark/bench_pi052_step.py "$@" || true -} - -# Compile sweep: does torch.compile + SDPA give a non-trivial boost on -# top of the bare SDPA path? -run --attn sdpa --batch-size 8 --compile -run --attn sdpa --batch-size 16 --compile - -# FlexAttention sweep (experimental): score_mod adds the additive bias -# in-kernel; expect a long first-step compile, then SDPA-or-better steady -# state. -run --attn flex --batch-size 8 -run --attn flex --batch-size 16 diff --git a/examples/benchmark/bench_pi052_step_v4.slurm b/examples/benchmark/bench_pi052_step_v4.slurm deleted file mode 100644 index f4b88dfa9..000000000 --- a/examples/benchmark/bench_pi052_step_v4.slurm +++ /dev/null @@ -1,41 +0,0 @@ -#!/bin/bash -#SBATCH --job-name=bench-pi052-v4 -#SBATCH --partition=hopper-prod -#SBATCH --qos=high -#SBATCH --time=01:00:00 -#SBATCH --ntasks=1 -#SBATCH --gpus-per-task=1 -#SBATCH --output=/fsx/pepijn/logs/bench_pi052_v4_%j.out - -set -euo pipefail - -cd "${LEROBOT_ROOT:-$HOME/lerobot}" - -export PATH="$HOME/miniconda3/bin:$HOME/.local/bin:$PATH" -export LD_LIBRARY_PATH="$HOME/miniconda3/lib:${LD_LIBRARY_PATH:-}" -export PYTORCH_CUDA_ALLOC_CONF="${PYTORCH_CUDA_ALLOC_CONF:-expandable_segments:True}" - -# /fsx triton cache is shared across nodes with different glibc versions -# — kernels built on one node trip GLIBC_2.34-not-found on another. Use -# a node-local cache per job to side-step that. -export TRITON_CACHE_DIR="/tmp/triton_${SLURM_JOB_ID}" -export TORCHINDUCTOR_CACHE_DIR="/tmp/torchinductor_${SLURM_JOB_ID}" -mkdir -p "$TRITON_CACHE_DIR" "$TORCHINDUCTOR_CACHE_DIR" - -echo "=== Node: $(hostname) ===" -nvidia-smi --query-gpu=name,driver_version,memory.total --format=csv,noheader -ldd --version | head -1 - -run() { - echo - echo "--- $* ---" - python examples/benchmark/bench_pi052_step.py "$@" || true -} - -# compile path on top of SDPA + selective AC -run --attn sdpa --batch-size 8 --compile -run --attn sdpa --batch-size 16 --compile - -# FlexAttention experimental -run --attn flex --batch-size 8 -run --attn flex --batch-size 16 diff --git a/examples/benchmark/bench_pi052_step_v5.slurm b/examples/benchmark/bench_pi052_step_v5.slurm deleted file mode 100644 index 3a9fc102a..000000000 --- a/examples/benchmark/bench_pi052_step_v5.slurm +++ /dev/null @@ -1,33 +0,0 @@ -#!/bin/bash -#SBATCH --job-name=bench-pi052-v5 -#SBATCH --partition=hopper-prod -#SBATCH --qos=high -#SBATCH --time=00:45:00 -#SBATCH --ntasks=1 -#SBATCH --gpus-per-task=1 -#SBATCH --output=/fsx/pepijn/logs/bench_pi052_v5_%j.out - -set -euo pipefail - -cd "${LEROBOT_ROOT:-$HOME/lerobot}" - -export PATH="$HOME/miniconda3/bin:$HOME/.local/bin:$PATH" -export LD_LIBRARY_PATH="$HOME/miniconda3/lib:${LD_LIBRARY_PATH:-}" -export PYTORCH_CUDA_ALLOC_CONF="${PYTORCH_CUDA_ALLOC_CONF:-expandable_segments:True}" -export TRITON_CACHE_DIR="/tmp/triton_${SLURM_JOB_ID}" -export TORCHINDUCTOR_CACHE_DIR="/tmp/torchinductor_${SLURM_JOB_ID}" -mkdir -p "$TRITON_CACHE_DIR" "$TORCHINDUCTOR_CACHE_DIR" - -echo "=== Node: $(hostname) ===" - -run() { - echo - echo "--- $* ---" - python examples/benchmark/bench_pi052_step.py "$@" || true -} - -# compile_mode=default (graph-only, no autotune) is the right knob with -# gradient checkpointing — max-autotune in v4 was 2x slower than no-compile. -run --attn sdpa --batch-size 8 --compile --compile-mode default -run --attn sdpa --batch-size 16 --compile --compile-mode default -run --attn sdpa --batch-size 8 --compile --compile-mode reduce-overhead diff --git a/examples/benchmark/bench_pi052_step_v6.slurm b/examples/benchmark/bench_pi052_step_v6.slurm deleted file mode 100644 index 99e016811..000000000 --- a/examples/benchmark/bench_pi052_step_v6.slurm +++ /dev/null @@ -1,31 +0,0 @@ -#!/bin/bash -#SBATCH --job-name=bench-pi052-v6-bs32 -#SBATCH --partition=hopper-prod -#SBATCH --qos=high -#SBATCH --time=00:30:00 -#SBATCH --ntasks=1 -#SBATCH --gpus-per-task=1 -#SBATCH --output=/fsx/pepijn/logs/bench_pi052_v6_%j.out - -set -euo pipefail - -cd "${LEROBOT_ROOT:-$HOME/lerobot}" - -export PATH="$HOME/miniconda3/bin:$HOME/.local/bin:$PATH" -export LD_LIBRARY_PATH="$HOME/miniconda3/lib:${LD_LIBRARY_PATH:-}" -export PYTORCH_CUDA_ALLOC_CONF="${PYTORCH_CUDA_ALLOC_CONF:-expandable_segments:True}" -export TRITON_CACHE_DIR="/tmp/triton_${SLURM_JOB_ID}" -export TORCHINDUCTOR_CACHE_DIR="/tmp/torchinductor_${SLURM_JOB_ID}" -mkdir -p "$TRITON_CACHE_DIR" "$TORCHINDUCTOR_CACHE_DIR" - -echo "=== Node: $(hostname) ===" -nvidia-smi --query-gpu=name,memory.total --format=csv,noheader - -run() { - echo - echo "--- $* ---" - python examples/benchmark/bench_pi052_step.py "$@" || true -} - -# BS=32 with the production settings (SDPA + compile=default). -run --attn sdpa --batch-size 32 --compile --compile-mode default diff --git a/examples/benchmark/bench_pi052_step_v7.slurm b/examples/benchmark/bench_pi052_step_v7.slurm deleted file mode 100644 index 6afc528af..000000000 --- a/examples/benchmark/bench_pi052_step_v7.slurm +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash -#SBATCH --job-name=bench-pi052-v7-opt -#SBATCH --partition=hopper-prod -#SBATCH --qos=high -#SBATCH --time=00:45:00 -#SBATCH --ntasks=1 -#SBATCH --gpus-per-task=1 -#SBATCH --output=/fsx/pepijn/logs/bench_pi052_v7_%j.out - -set -euo pipefail - -cd "${LEROBOT_ROOT:-$HOME/lerobot}" - -export PATH="$HOME/miniconda3/bin:$HOME/.local/bin:$PATH" -export LD_LIBRARY_PATH="$HOME/miniconda3/lib:${LD_LIBRARY_PATH:-}" -export PYTORCH_CUDA_ALLOC_CONF="${PYTORCH_CUDA_ALLOC_CONF:-expandable_segments:True}" -export TRITON_CACHE_DIR="/tmp/triton_${SLURM_JOB_ID}" -export TORCHINDUCTOR_CACHE_DIR="/tmp/torchinductor_${SLURM_JOB_ID}" -mkdir -p "$TRITON_CACHE_DIR" "$TORCHINDUCTOR_CACHE_DIR" - -echo "=== Node: $(hostname) ===" -nvidia-smi --query-gpu=name,memory.total --format=csv,noheader - -run() { - echo - echo "--- $* ---" - python examples/benchmark/bench_pi052_step.py "$@" || true -} - -# Realistic full-step memory: fwd + bwd + AdamW step. The original -# sweep was fwd+bwd-only and undercounted memory by the optimizer- -# state size (~2x param bytes for AdamW). This run confirms BS=16 -# and BS=32 still fit with the optimizer in residency. -run --attn sdpa --batch-size 16 --compile --compile-mode default --optimizer adamw_fused -run --attn sdpa --batch-size 32 --compile --compile-mode default --optimizer adamw_fused - -# Without compile, in case the production cluster has compile issues. -run --attn sdpa --batch-size 16 --optimizer adamw_fused -run --attn sdpa --batch-size 32 --optimizer adamw_fused diff --git a/examples/benchmark/bench_pi052_step_v8.slurm b/examples/benchmark/bench_pi052_step_v8.slurm deleted file mode 100644 index a8ed8a8aa..000000000 --- a/examples/benchmark/bench_pi052_step_v8.slurm +++ /dev/null @@ -1,36 +0,0 @@ -#!/bin/bash -#SBATCH --job-name=bench-pi052-v8-bs40-dtype -#SBATCH --partition=hopper-prod -#SBATCH --qos=high -#SBATCH --time=00:45:00 -#SBATCH --ntasks=1 -#SBATCH --gpus-per-task=1 -#SBATCH --output=/fsx/pepijn/logs/bench_pi052_v8_%j.out - -set -euo pipefail - -cd "${LEROBOT_ROOT:-$HOME/lerobot}" - -export PATH="$HOME/miniconda3/bin:$HOME/.local/bin:$PATH" -export LD_LIBRARY_PATH="$HOME/miniconda3/lib:${LD_LIBRARY_PATH:-}" -export PYTORCH_CUDA_ALLOC_CONF="${PYTORCH_CUDA_ALLOC_CONF:-expandable_segments:True}" -export TRITON_CACHE_DIR="/tmp/triton_${SLURM_JOB_ID}" -export TORCHINDUCTOR_CACHE_DIR="/tmp/torchinductor_${SLURM_JOB_ID}" -mkdir -p "$TRITON_CACHE_DIR" "$TORCHINDUCTOR_CACHE_DIR" - -echo "=== Node: $(hostname) ===" -nvidia-smi --query-gpu=name,memory.total --format=csv,noheader - -run() { - echo - echo "--- $* ---" - python examples/benchmark/bench_pi052_step.py "$@" || true -} - -# Confirm BS=40 fits on a single H100 with the optimizer in residency. -run --attn sdpa --batch-size 40 --compile --compile-mode default --optimizer adamw_fused - -# Dtype A/B at modest batch — fp32 needs ~2x the memory of bf16, so we -# drop to BS=4 to keep both runs comparable instead of OOMing fp32. -run --attn sdpa --batch-size 4 --optimizer adamw_fused --dtype bfloat16 -run --attn sdpa --batch-size 4 --optimizer adamw_fused --dtype float32 diff --git a/examples/benchmark/fsdp_pi052.yaml b/examples/benchmark/fsdp_pi052.yaml deleted file mode 100644 index f9f8b71da..000000000 --- a/examples/benchmark/fsdp_pi052.yaml +++ /dev/null @@ -1,29 +0,0 @@ -compute_environment: LOCAL_MACHINE -debug: false -distributed_type: FSDP -downcast_bf16: 'no' -enable_cpu_affinity: false -fsdp_config: - fsdp_activation_checkpointing: false - fsdp_auto_wrap_policy: TRANSFORMER_BASED_WRAP - fsdp_backward_prefetch: BACKWARD_PRE - fsdp_cpu_ram_efficient_loading: true - fsdp_forward_prefetch: false - fsdp_offload_params: false - fsdp_reshard_after_forward: true - fsdp_state_dict_type: SHARDED_STATE_DICT - fsdp_sync_module_states: true - fsdp_transformer_layer_cls_to_wrap: GemmaDecoderLayer,SiglipEncoderLayer - fsdp_use_orig_params: true - fsdp_version: 2 -machine_rank: 0 -main_training_function: main -mixed_precision: bf16 -num_machines: 1 -num_processes: 8 -rdzv_backend: static -same_network: true -tpu_env: [] -tpu_use_cluster: false -tpu_use_sudo: false -use_cpu: false diff --git a/examples/port_datasets/slurm_build_robocasa_composite_seen.py b/examples/port_datasets/slurm_build_robocasa_composite_seen.py deleted file mode 100644 index a8cd3394f..000000000 --- a/examples/port_datasets/slurm_build_robocasa_composite_seen.py +++ /dev/null @@ -1,1053 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2026 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Rebuild RoboCasa tarballs into one unified LeRobot v3 dataset. - -Discovers tasks from RoboCasa's ``box_links_ds.json`` for a given ``--split`` -(``target`` or ``pretrain``) and ``--source`` (``human`` / ``mimicgen``), then -filters to a chosen ``--task-set`` (``composite_seen``, ``composite_unseen``, -``composite_all``, ``atomic``, ``composite_atomic``, ``all``) or an explicit -``--tasks`` list. Same code path produces the 16-task ``composite_seen`` slice, -the full 50-task target benchmark, the 300-task ``Human300`` pretraining -slice, or any 2-task smoke set. - -Per-rank, each datatrove worker: - -1. Downloads the assigned task tarball(s) directly from Box (resolved via the - ``box_links_ds.json`` bundled with the local ``robocasa`` clone). -2. Converts the extracted LeRobot v2.1 dataset to v3.0 in place. -3. Rewrites the per-task data into a per-rank shard with: - - the canonical RoboCasa task name in ``task`` - - standardized camera keys under ``observation.images.robot0_*`` - - a guaranteed flat ``observation.state`` (concatenation of base / EE / - gripper sub-keys when the source dataset stores them separately) - - a standardized ``action`` key - -A single aggregate worker then merges all shards into one unified dataset. - -Heavy lifting is parallelized via Datatrove + SLURM on CPU nodes. With -``--workers=16 --cpus-per-task=8`` on ``hopper-cpu`` you get 128 CPUs total -across the prepare phase (one task per worker, 8 CPUs each for ffmpeg / -parquet) and the aggregate phase reuses the same CPU budget on a single node. - -Typical hopper-cpu invocation:: - - uv run python examples/port_datasets/slurm_build_robocasa_composite_seen.py \\ - --repo-id=${HF_USER}/robocasa_composite_seen_v3 \\ - --work-dir=/fsx/${USER}/robocasa/datasets/v1.0 \\ - --robocasa-root=/fsx/${USER}/robocasa \\ - --split=target \\ - --source=human \\ - --partition=hopper-cpu \\ - --workers=16 \\ - --cpus-per-task=8 \\ - --mem-per-cpu=4G \\ - --time=24:00:00 \\ - --logs-dir=/fsx/${USER}/logs/robocasa - -Local debug (no SLURM):: - - uv run python examples/port_datasets/slurm_build_robocasa_composite_seen.py \\ - --repo-id=local/robocasa_composite_seen_v3_smoke \\ - --work-dir=/tmp/robocasa_smoke \\ - --robocasa-root=$HOME/robocasa \\ - --slurm=0 --workers=1 \\ - --tasks PrepareCoffee - -If ``robocasa`` is already importable in the runtime environment, you can omit -``--robocasa-root``; the box-links manifest will be located from the package. -""" - -from __future__ import annotations - -import argparse -import json -from pathlib import Path - -from datatrove.executor import LocalPipelineExecutor -from datatrove.executor.slurm import SlurmPipelineExecutor -from datatrove.pipeline.base import PipelineStep - -DEFAULT_SPLIT = "target" -DEFAULT_SOURCE = "human" -DEFAULT_ROBOT_TYPE = "robocasa" - -# RoboCasa365 target benchmark task groupings. Order matches the official docs. -COMPOSITE_SEEN_TASKS: list[str] = [ - "DeliverStraw", - "GetToastedBread", - "KettleBoiling", - "LoadDishwasher", - "PackIdenticalLunches", - "PreSoakPan", - "PrepareCoffee", - "RinseSinkBasin", - "ScrubCuttingBoard", - "SearingMeat", - "SetUpCuttingStation", - "StackBowlsCabinet", - "SteamInMicrowave", - "StirVegetables", - "StoreLeftoversInBowl", - "WashLettuce", -] - -COMPOSITE_UNSEEN_TASKS: list[str] = [ - "ArrangeBreadBasket", - "ArrangeTea", - "BreadSelection", - "CategorizeCondiments", - "CuttingToolSelection", - "GarnishPancake", - "GatherTableware", - "HeatKebabSandwich", - "MakeIceLemonade", - "PanTransfer", - "PortionHotDogs", - "RecycleBottlesByType", - "SeparateFreezerRack", - "WaffleReheat", - "WashFruitColander", - "WeighIngredients", -] - -ATOMIC_TASKS: list[str] = [ - "CloseBlenderLid", - "CloseFridge", - "CloseToasterOvenDoor", - "CoffeeSetupMug", - "NavigateKitchen", - "OpenCabinet", - "OpenDrawer", - "OpenStandMixerHead", - "PickPlaceCounterToCabinet", - "PickPlaceCounterToStove", - "PickPlaceDrawerToCounter", - "PickPlaceSinkToCounter", - "PickPlaceToasterToCounter", - "SlideDishwasherRack", - "TurnOffStove", - "TurnOnElectricKettle", - "TurnOnMicrowave", - "TurnOnSinkFaucet", -] - -TASK_SETS: dict[str, list[str]] = { - "composite_seen": COMPOSITE_SEEN_TASKS, - "composite_unseen": COMPOSITE_UNSEEN_TASKS, - "composite_all": COMPOSITE_SEEN_TASKS + COMPOSITE_UNSEEN_TASKS, - "atomic": ATOMIC_TASKS, - "composite_atomic": COMPOSITE_SEEN_TASKS + COMPOSITE_UNSEEN_TASKS + ATOMIC_TASKS, - "all": [], # sentinel — no filter -} - - -def _task_name_from_tar_key(tar_key: str) -> str: - parts = tar_key.split("/") - if len(parts) < 3: - raise ValueError(f"Unexpected RoboCasa tar key: {tar_key}") - return parts[2].removesuffix(".tar") - - -def _resolve_box_links_json( - box_links_json: Path | None, - robocasa_root: Path | None, -) -> Path: - if box_links_json is not None: - if not box_links_json.exists(): - raise FileNotFoundError(f"--box-links-json does not exist: {box_links_json}") - return box_links_json - - if robocasa_root is not None: - candidates = [ - robocasa_root / "models" / "assets" / "box_links" / "box_links_ds.json", - robocasa_root / "robocasa" / "models" / "assets" / "box_links" / "box_links_ds.json", - ] - for candidate in candidates: - if candidate.exists(): - return candidate - raise FileNotFoundError( - f"Could not find box_links_ds.json under --robocasa-root={robocasa_root}" - ) - - try: - import robocasa # noqa: PLC0415 - except ModuleNotFoundError as exc: - raise FileNotFoundError( - "Could not resolve RoboCasa box links. Pass --robocasa-root or --box-links-json, " - "or run in an environment where `robocasa` is importable." - ) from exc - - candidate = Path(robocasa.__path__[0]) / "models" / "assets" / "box_links" / "box_links_ds.json" - if not candidate.exists(): - raise FileNotFoundError(f"Resolved RoboCasa package, but box links file is missing: {candidate}") - return candidate - - -def _discover_tasks( - box_links_json: Path, - split: str = DEFAULT_SPLIT, - source: str | None = DEFAULT_SOURCE, -) -> list[dict[str, str]]: - with open(box_links_json) as f: - box_links: dict[str, str] = json.load(f) - - tasks: list[dict[str, str]] = [] - for tar_key in sorted(box_links): - parts = tar_key.split("/") - if len(parts) < 3 or parts[0] != split: - continue - - # RoboCasa registries can appear in two layouts: - # new: split////lerobot.tar - # old: split//.tar - if parts[1] in {"human", "mimicgen"}: - tar_source = parts[1] - else: - tar_source = "human" - - if source is not None and tar_source != source: - continue - - tasks.append( - { - "task_name": _task_name_from_tar_key(tar_key), - "tar_key": tar_key, - "source": tar_source, - "rel_path": tar_key.removesuffix(".tar"), - "shared_url": box_links[tar_key], - } - ) - return tasks - - -class PrepareRoboCasaUnifiedShards(PipelineStep): - """Build per-rank unified shards from RoboCasa task tarballs.""" - - def __init__( - self, - tasks: list[dict[str, str]], - output_repo_id: str, - work_dir: str, - split: str, - robot_type: str, - overwrite: bool = False, - cleanup_temp: bool = False, - max_episodes_per_task: int | None = None, - vcodec: str = "libsvtav1", - ): - super().__init__() - self.tasks = tasks - self.output_repo_id = output_repo_id - self.work_dir = Path(work_dir) - self.split = split - self.robot_type = robot_type - self.overwrite = overwrite - self.cleanup_temp = cleanup_temp - self.max_episodes_per_task = max_episodes_per_task - self.vcodec = vcodec - - def run(self, data=None, rank: int = 0, world_size: int = 1): - import copy - import json - import logging - import shutil - import tarfile - import urllib.request - - import numpy as np - from PIL import Image - - from lerobot.datasets.lerobot_dataset import LeRobotDataset - from lerobot.datasets.utils import DEFAULT_DATA_FILE_SIZE_IN_MB, DEFAULT_VIDEO_FILE_SIZE_IN_MB - from lerobot.scripts.convert_dataset_v21_to_v30 import ( - convert_data, - convert_episodes_metadata, - convert_info, - convert_tasks, - convert_videos, - validate_local_dataset_version, - ) - from lerobot.utils.utils import init_logging - - init_logging() - - target_image_keys = { - "observation.images.robot0_agentview_left": [ - "observation.images.robot0_agentview_left", - "left_image", - "observation.images.left_image", - ], - "observation.images.robot0_agentview_right": [ - "observation.images.robot0_agentview_right", - "right_image", - "observation.images.right_image", - ], - "observation.images.robot0_eye_in_hand": [ - "observation.images.robot0_eye_in_hand", - "wrist_image", - "observation.images.wrist_image", - ], - } - direct_state_keys = [ - "observation.state", - "state", - ] - explicit_state_groups = [ - [ - "observation.state.base_position", - "observation.state.base_rotation", - "observation.state.end_effector_position_relative", - "observation.state.end_effector_rotation_relative", - "observation.state.gripper_qpos", - ], - [ - "state.base_position", - "state.base_rotation", - "state.end_effector_position_relative", - "state.end_effector_rotation_relative", - "state.gripper_qpos", - ], - ] - - my_tasks = self.tasks[rank::world_size] - logging.info( - "Rank %s/%s: rebuilding %s of %s tasks", - rank, - world_size, - len(my_tasks), - len(self.tasks), - ) - if not my_tasks: - return - - shard_repo_id = f"{self.output_repo_id}_world_{world_size}_rank_{rank}" - shard_root = ( - self.work_dir - / "shards" - / self.output_repo_id.replace("/", "__") - / f"world_{world_size}" - / f"rank_{rank}" - ) - - def shard_is_complete(root: Path) -> bool: - info_path = root / "meta" / "info.json" - tasks_path = root / "meta" / "tasks.parquet" - stats_path = root / "meta" / "stats.json" - if not (info_path.exists() and tasks_path.exists() and stats_path.exists()): - return False - - episodes_dir = root / "meta" / "episodes" - data_dir = root / "data" - videos_dir = root / "videos" - if not episodes_dir.exists() or not data_dir.exists() or not videos_dir.exists(): - return False - if not any(episodes_dir.rglob("*.parquet")): - return False - if not any(data_dir.rglob("*.parquet")): - return False - if not any(videos_dir.rglob("*.mp4")): - return False - - with open(info_path) as f: - info = json.load(f) - return info.get("total_episodes", 0) > 0 and info.get("total_frames", 0) > 0 - - if shard_is_complete(shard_root) and not self.overwrite: - logging.info("Shard already complete, skipping rank %s: %s", rank, shard_root) - return - if shard_root.exists(): - if self.overwrite: - logging.warning("Removing existing shard root (--overwrite): %s", shard_root) - else: - logging.warning("Removing incomplete shard root before rebuild: %s", shard_root) - shutil.rmtree(shard_root) - - def direct_download_url(shared_url: str) -> str: - shared_id = shared_url.rstrip("/").split("/")[-1] - base = shared_url.split("/s/")[0] - return f"{base}/shared/static/{shared_id}.tar" - - def restore_v21_root_if_needed(dataset_root: Path) -> None: - old_root = dataset_root.parent / f"{dataset_root.name}_old" - if not dataset_root.exists() and old_root.exists(): - shutil.move(str(old_root), str(dataset_root)) - - def download_and_extract(shared_url: str, destination: Path) -> None: - url = direct_download_url(shared_url) - extract_dir = destination.parent - extract_dir.mkdir(parents=True, exist_ok=True) - tar_path = extract_dir / f"{destination.name}.tar" - - if destination.exists() and (destination / "meta" / "info.json").exists(): - logging.info(" Already extracted: %s", destination) - return - - for attempt in range(3): - try: - logging.info(" Downloading (attempt %s) -> %s", attempt + 1, tar_path) - urllib.request.urlretrieve(url, str(tar_path)) - break - except Exception as exc: - logging.warning(" Download attempt %s failed: %s", attempt + 1, exc) - if tar_path.exists(): - tar_path.unlink() - else: - raise RuntimeError(f"Failed to download {url} after 3 attempts") - - logging.info(" Extracting to %s", extract_dir) - with tarfile.open(tar_path, "r") as tar: - tar.extractall(path=extract_dir) - tar_path.unlink() - - def is_v30(dataset_root: Path) -> bool: - info_path = dataset_root / "meta" / "info.json" - if not info_path.exists(): - return False - with open(info_path) as f: - info = json.load(f) - return info.get("codebase_version") == "v3.0" - - def convert_v21_to_v30(dataset_root: Path) -> None: - data_mb = DEFAULT_DATA_FILE_SIZE_IN_MB - video_mb = DEFAULT_VIDEO_FILE_SIZE_IN_MB - - validate_local_dataset_version(dataset_root) - - new_root = dataset_root.parent / f"{dataset_root.name}_v30" - if new_root.exists(): - shutil.rmtree(new_root) - - convert_info(dataset_root, new_root, data_mb, video_mb) - convert_tasks(dataset_root, new_root) - episodes_metadata = convert_data(dataset_root, new_root, data_mb) - episodes_video_metadata = convert_videos(dataset_root, new_root, video_mb) - convert_episodes_metadata( - dataset_root, - new_root, - episodes_metadata, - episodes_video_metadata, - ) - - old_root = dataset_root.parent / f"{dataset_root.name}_old" - if old_root.exists(): - shutil.rmtree(old_root) - shutil.move(str(dataset_root), str(old_root)) - shutil.move(str(new_root), str(dataset_root)) - logging.info(" Conversion complete: %s", dataset_root) - - def as_float32_vector(value) -> np.ndarray: - if value.__class__.__module__.startswith("torch"): - arr = value.detach().cpu().numpy() - else: - arr = np.asarray(value) - return arr.astype(np.float32).reshape(-1) - - def to_pil_image(value) -> Image.Image: - if isinstance(value, Image.Image): - return value - if value.__class__.__module__.startswith("torch"): - arr = value.detach().cpu() - if arr.ndim != 3: - raise ValueError(f"Expected rank-3 image tensor, got shape {tuple(arr.shape)}") - if arr.shape[0] in (1, 3, 4) and arr.shape[-1] not in (1, 3, 4): - arr = arr.permute(1, 2, 0) - if getattr(arr.dtype, "is_floating_point", False): - if float(arr.max()) <= 1.0: - arr = arr * 255.0 - arr = arr.clamp(0, 255).byte() - else: - arr = arr.byte() - return Image.fromarray(arr.numpy()) - - arr = np.asarray(value) - if arr.ndim != 3: - raise ValueError(f"Expected rank-3 image array, got shape {arr.shape}") - if arr.shape[0] in (1, 3, 4) and arr.shape[-1] not in (1, 3, 4): - arr = np.transpose(arr, (1, 2, 0)) - if np.issubdtype(arr.dtype, np.floating): - if float(arr.max()) <= 1.0: - arr = arr * 255.0 - arr = np.clip(arr, 0, 255).astype(np.uint8) - elif arr.dtype != np.uint8: - arr = arr.astype(np.uint8) - return Image.fromarray(arr) - - def normalize_name(name: str) -> str: - return name.replace("/", ".").replace("_", ".").lower() - - def choose_one(available_keys: list[str], aliases: list[str], label: str) -> str: - for alias in aliases: - if alias in available_keys: - return alias - raise ValueError(f"Could not resolve {label}. Available keys: {available_keys}") - - def resolve_image_key_map(available_keys: list[str]) -> dict[str, str]: - return { - target_key: choose_one(available_keys, aliases, target_key) - for target_key, aliases in target_image_keys.items() - } - - def resolve_action_key(available_keys: list[str]) -> str: - return choose_one(available_keys, ["action", "actions"], "action") - - def state_sort_key(name: str) -> tuple[int, str]: - normalized = normalize_name(name) - if "base.position" in normalized: - return (0, normalized) - if "base.rotation" in normalized or "base.quat" in normalized: - return (1, normalized) - if "end.effector.position" in normalized or "eef.pos" in normalized: - return (2, normalized) - if "end.effector.rotation" in normalized or "eef.quat" in normalized or "eef.rot" in normalized: - return (3, normalized) - if "gripper" in normalized: - return (4, normalized) - return (5, normalized) - - def resolve_state_keys(available_keys: list[str]) -> list[str]: - for key in direct_state_keys: - if key in available_keys: - return [key] - - for group in explicit_state_groups: - if all(key in available_keys for key in group): - return group - - prefix_keys = [ - key - for key in available_keys - if key.startswith("observation.state.") or key.startswith("state.") - ] - if prefix_keys: - return sorted(prefix_keys, key=state_sort_key) - - proprio_like = [ - key - for key in available_keys - if any( - token in normalize_name(key) - for token in ["base.position", "base.rotation", "end.effector", "eef", "gripper"] - ) - ] - if proprio_like: - return sorted(set(proprio_like), key=state_sort_key) - - raise ValueError(f"Could not resolve RoboCasa proprioception keys. Available keys: {available_keys}") - - def build_state(item: dict, state_keys: list[str]) -> np.ndarray: - if len(state_keys) == 1: - return as_float32_vector(item[state_keys[0]]) - parts = [as_float32_vector(item[key]) for key in state_keys] - return np.concatenate(parts, axis=0).astype(np.float32) - - def infer_target_features( - src_dataset: LeRobotDataset, - image_key_map: dict[str, str], - action_key: str, - state_dim: int, - ) -> tuple[dict, bool]: - features = {} - use_videos = False - - for target_key, source_key in image_key_map.items(): - feature_info = copy.deepcopy(src_dataset.meta.features[source_key]) - if "fps" not in feature_info and feature_info.get("dtype") != "video": - feature_info["fps"] = int(src_dataset.meta.fps) - use_videos = use_videos or feature_info.get("dtype") == "video" - features[target_key] = feature_info - - action_info = copy.deepcopy(src_dataset.meta.features[action_key]) - action_info["dtype"] = "float32" - action_info["fps"] = int(src_dataset.meta.fps) - features["action"] = action_info - - features["observation.state"] = { - "dtype": "float32", - "shape": (state_dim,), - "names": [f"state_{i}" for i in range(state_dim)], - "fps": int(src_dataset.meta.fps), - } - - return features, use_videos - - def task_root(task_meta: dict[str, str]) -> Path: - return self.work_dir / task_meta["rel_path"] - - def cleanup_task_root(dataset_root: Path) -> None: - old_root = dataset_root.parent / f"{dataset_root.name}_old" - if dataset_root.exists(): - shutil.rmtree(dataset_root) - if old_root.exists(): - shutil.rmtree(old_root) - - shard_dataset = None - shard_meta: dict[str, int | tuple[int, ...]] | None = None - - for task_meta in my_tasks: - task_name = task_meta["task_name"] - dataset_root = task_root(task_meta) - - logging.info("--- %s (%s) ---", task_name, task_meta["tar_key"]) - restore_v21_root_if_needed(dataset_root) - - download_and_extract(task_meta["shared_url"], dataset_root) - if not is_v30(dataset_root): - convert_v21_to_v30(dataset_root) - - src_dataset = LeRobotDataset(repo_id=task_name, root=dataset_root) - available_keys = list(src_dataset.meta.features.keys()) - image_key_map = resolve_image_key_map(available_keys) - action_key = resolve_action_key(available_keys) - state_keys = resolve_state_keys(available_keys) - - if len(src_dataset) == 0: - raise ValueError(f"Task dataset is empty: {dataset_root}") - - first_item = src_dataset[0] - first_state = build_state(first_item, state_keys) - first_action = as_float32_vector(first_item[action_key]) - - if shard_dataset is None: - target_features, use_videos = infer_target_features( - src_dataset=src_dataset, - image_key_map=image_key_map, - action_key=action_key, - state_dim=int(first_state.size), - ) - shard_dataset = LeRobotDataset.create( - repo_id=shard_repo_id, - root=shard_root, - fps=int(src_dataset.meta.fps), - robot_type=self.robot_type, - features=target_features, - use_videos=use_videos, - vcodec=self.vcodec, - batch_encoding_size=1, - ) - shard_meta = { - "fps": int(src_dataset.meta.fps), - "state_dim": int(first_state.size), - "action_shape": tuple(first_action.shape), - } - else: - assert shard_meta is not None - if int(src_dataset.meta.fps) != shard_meta["fps"]: - raise ValueError( - f"FPS mismatch for {task_name}: {src_dataset.meta.fps} != {shard_meta['fps']}" - ) - if int(first_state.size) != shard_meta["state_dim"]: - raise ValueError( - f"State dim mismatch for {task_name}: {first_state.size} != {shard_meta['state_dim']}" - ) - if tuple(first_action.shape) != shard_meta["action_shape"]: - raise ValueError( - f"Action shape mismatch for {task_name}: {tuple(first_action.shape)} != " - f"{shard_meta['action_shape']}" - ) - - num_episodes = src_dataset.num_episodes - if self.max_episodes_per_task is not None: - num_episodes = min(num_episodes, self.max_episodes_per_task) - - logging.info(" Appending %s episodes into shard %s", num_episodes, shard_root) - for episode_idx in range(num_episodes): - start = int(src_dataset.meta.episodes["dataset_from_index"][episode_idx]) - end = int(src_dataset.meta.episodes["dataset_to_index"][episode_idx]) - - for frame_idx in range(start, end): - item = src_dataset[frame_idx] - frame = { - "task": task_name, - "observation.state": build_state(item, state_keys), - "action": as_float32_vector(item[action_key]), - } - for target_key, source_key in image_key_map.items(): - frame[target_key] = to_pil_image(item[source_key]) - shard_dataset.add_frame(frame) - - shard_dataset.save_episode() - - if self.cleanup_temp: - cleanup_task_root(dataset_root) - - if shard_dataset is None: - logging.warning("Rank %s produced no shard dataset", rank) - return - - shard_dataset.finalize() - logging.info("Rank %s finalized shard at %s", rank, shard_root) - - -class AggregateRoboCasaUnifiedShards(PipelineStep): - """Aggregate repaired shard datasets into one final RoboCasa dataset.""" - - def __init__( - self, - output_repo_id: str, - shard_roots: list[str], - output_root: str, - push: bool = True, - overwrite: bool = False, - hub_tags: list[str] | None = None, - ): - super().__init__() - self.output_repo_id = output_repo_id - self.shard_roots = [Path(root) for root in shard_roots] - self.output_root = Path(output_root) - self.push = push - self.overwrite = overwrite - self.hub_tags = hub_tags or ["lerobot", "robocasa", "unified"] - - def run(self, data=None, rank: int = 0, world_size: int = 1): - import json - import logging - import shutil - - from lerobot.datasets.aggregate import aggregate_datasets - from lerobot.datasets.lerobot_dataset import LeRobotDataset - from lerobot.utils.utils import init_logging - - init_logging() - - if rank != 0: - logging.info("Rank %s: only rank 0 aggregates", rank) - return - - def shard_is_complete(root: Path) -> bool: - info_path = root / "meta" / "info.json" - tasks_path = root / "meta" / "tasks.parquet" - stats_path = root / "meta" / "stats.json" - if not (info_path.exists() and tasks_path.exists() and stats_path.exists()): - return False - - episodes_dir = root / "meta" / "episodes" - data_dir = root / "data" - videos_dir = root / "videos" - if not episodes_dir.exists() or not data_dir.exists() or not videos_dir.exists(): - return False - if not any(episodes_dir.rglob("*.parquet")): - return False - if not any(data_dir.rglob("*.parquet")): - return False - if not any(videos_dir.rglob("*.mp4")): - return False - - with open(info_path) as f: - info = json.load(f) - return info.get("total_episodes", 0) > 0 and info.get("total_frames", 0) > 0 - - missing = [root for root in self.shard_roots if not shard_is_complete(root)] - if missing: - raise FileNotFoundError(f"Missing shard datasets: {missing}") - - if self.output_root.exists() and self.overwrite: - logging.warning("Removing existing unified output (--overwrite): %s", self.output_root) - shutil.rmtree(self.output_root) - - shard_repo_ids = [f"{self.output_repo_id}_shard_{idx}" for idx in range(len(self.shard_roots))] - logging.info("Aggregating %s shards into %s", len(self.shard_roots), self.output_root) - aggregate_datasets( - repo_ids=shard_repo_ids, - roots=self.shard_roots, - aggr_repo_id=self.output_repo_id, - aggr_root=self.output_root, - ) - - if self.push: - dataset = LeRobotDataset(repo_id=self.output_repo_id, root=self.output_root) - dataset.push_to_hub( - tags=self.hub_tags, - private=False, - ) - logging.info("Pushed to https://huggingface.co/datasets/%s", self.output_repo_id) - - -def make_prepare_executor( - *, - tasks: list[dict[str, str]], - output_repo_id: str, - work_dir: Path, - split: str, - robot_type: str, - overwrite: bool, - cleanup_temp: bool, - max_episodes_per_task: int | None, - vcodec: str, - job_name: str, - logs_dir: Path, - workers: int, - partition: str, - cpus_per_task: int, - mem_per_cpu: str, - time_limit: str, - slurm: bool, -): - kwargs = { - "pipeline": [ - PrepareRoboCasaUnifiedShards( - tasks=tasks, - output_repo_id=output_repo_id, - work_dir=str(work_dir), - split=split, - robot_type=robot_type, - overwrite=overwrite, - cleanup_temp=cleanup_temp, - max_episodes_per_task=max_episodes_per_task, - vcodec=vcodec, - ) - ], - "logging_dir": str(logs_dir / job_name), - } - - if slurm: - kwargs.update( - { - "job_name": job_name, - "tasks": workers, - "workers": workers, - "time": time_limit, - "partition": partition, - "cpus_per_task": cpus_per_task, - "sbatch_args": {"mem-per-cpu": mem_per_cpu}, - } - ) - return SlurmPipelineExecutor(**kwargs) - - kwargs.update({"tasks": workers, "workers": 1}) - return LocalPipelineExecutor(**kwargs) - - -def make_aggregate_executor( - *, - output_repo_id: str, - shard_roots: list[Path], - output_root: Path, - push: bool, - overwrite: bool, - job_name: str, - logs_dir: Path, - partition: str, - cpus_per_task: int, - mem_per_cpu: str, - time_limit: str, - slurm: bool, - hub_tags: list[str] | None = None, - depends: SlurmPipelineExecutor | None = None, -): - kwargs = { - "pipeline": [ - AggregateRoboCasaUnifiedShards( - output_repo_id=output_repo_id, - shard_roots=[str(root) for root in shard_roots], - output_root=str(output_root), - push=push, - overwrite=overwrite, - hub_tags=hub_tags, - ) - ], - "logging_dir": str(logs_dir / job_name), - } - - if slurm: - kwargs.update( - { - "job_name": job_name, - "tasks": 1, - "workers": 1, - "time": time_limit, - "partition": partition, - "cpus_per_task": cpus_per_task, - "sbatch_args": {"mem-per-cpu": mem_per_cpu}, - "depends": depends, - } - ) - return SlurmPipelineExecutor(**kwargs) - - kwargs.update({"tasks": 1, "workers": 1}) - return LocalPipelineExecutor(**kwargs) - - -def resolve_repo_id(args: argparse.Namespace) -> str: - if args.repo_id: - return args.repo_id - if args.hf_user: - return f"{args.hf_user}/robocasa_composite_seen_{args.split}_{args.source}_unified_v3" - raise ValueError("Pass either --repo-id or --hf-user.") - - -def main(): - parser = argparse.ArgumentParser( - description="Rebuild the 16 RoboCasa composite_seen tarballs into one unified LeRobot v3 dataset." - ) - parser.add_argument("--repo-id", type=str, default=None, help="Final unified dataset repo id.") - parser.add_argument( - "--hf-user", - type=str, - default=None, - help="Optional shorthand. If set and --repo-id is omitted, derive " - "/robocasa_composite_seen___unified_v3.", - ) - parser.add_argument("--work-dir", type=Path, required=True) - parser.add_argument("--split", type=str, default=DEFAULT_SPLIT, choices=["target", "pretrain"]) - parser.add_argument("--source", type=str, default=DEFAULT_SOURCE) - parser.add_argument( - "--mode", - type=str, - default="all", - choices=["all", "prepare", "aggregate"], - help="prepare = build shards, aggregate = merge existing shards, all = do both.", - ) - parser.add_argument( - "--task-set", - type=str, - default="composite_seen", - choices=sorted(TASK_SETS.keys()), - help="Predefined task set to restrict discovery to. Default " - "``composite_seen`` (the 16 multi-step composite_seen tasks). Use " - "``all`` to keep every discovered task in the split/source slice. " - "``--tasks`` overrides this when provided.", - ) - parser.add_argument("--robocasa-root", type=Path, default=None) - parser.add_argument("--box-links-json", type=Path, default=None) - parser.add_argument("--robot-type", type=str, default=DEFAULT_ROBOT_TYPE) - parser.add_argument("--vcodec", type=str, default="libsvtav1") - parser.add_argument("--max-episodes-per-task", type=int, default=None) - parser.add_argument("--cleanup-temp", action="store_true") - parser.add_argument("--overwrite", action="store_true") - parser.add_argument("--no-push", action="store_true") - parser.add_argument("--logs-dir", type=Path, default=Path("logs")) - parser.add_argument("--job-name", type=str, default="port_robocasa_composite_seen") - parser.add_argument("--slurm", type=int, default=1, help="1 = Slurm executor, 0 = local debug.") - parser.add_argument( - "--workers", - type=int, - default=16, - help="Number of SLURM workers. Default 16 = one per composite_seen task.", - ) - parser.add_argument("--partition", type=str, default="hopper-cpu") - parser.add_argument( - "--cpus-per-task", - type=int, - default=8, - help="CPUs per worker. 16 workers × 8 cpus = 128 cpus total on hopper-cpu.", - ) - parser.add_argument("--mem-per-cpu", type=str, default="4G") - parser.add_argument("--time", type=str, default="24:00:00") - parser.add_argument( - "--tasks", - type=str, - nargs="*", - default=None, - help="Explicit task names. Overrides --task-set when provided.", - ) - parser.add_argument("--dryrun", action="store_true") - args = parser.parse_args() - - box_links_json = _resolve_box_links_json(args.box_links_json, args.robocasa_root) - all_tasks = _discover_tasks(box_links_json, split=args.split, source=args.source) - - # Filter: explicit --tasks wins; otherwise apply --task-set. - if args.tasks: - selected = {task.lower() for task in args.tasks} - all_tasks = [task for task in all_tasks if task["task_name"].lower() in selected] - elif args.task_set != "all": - wanted = {t.lower() for t in TASK_SETS[args.task_set]} - all_tasks = [task for task in all_tasks if task["task_name"].lower() in wanted] - - if not all_tasks: - raise ValueError( - f"No RoboCasa tasks selected for split={args.split!r}, source={args.source!r}, " - f"task_set={args.task_set!r}, tasks={args.tasks!r}" - ) - - print(f"Tasks to rebuild ({len(all_tasks)}):") - for task in all_tasks: - print(f" {task['task_name']} ({task['tar_key']})") - if args.dryrun: - return - - output_repo_id = resolve_repo_id(args) - output_root = args.work_dir / "unified" / output_repo_id - active_ranks = [rank for rank in range(args.workers) if all_tasks[rank::args.workers]] - shard_roots = [ - args.work_dir - / "shards" - / output_repo_id.replace("/", "__") - / f"world_{args.workers}" - / f"rank_{rank}" - for rank in active_ranks - ] - - prepare_executor = None - if args.mode in {"all", "prepare"}: - prepare_executor = make_prepare_executor( - tasks=all_tasks, - output_repo_id=output_repo_id, - work_dir=args.work_dir, - split=args.split, - robot_type=args.robot_type, - overwrite=args.overwrite, - cleanup_temp=args.cleanup_temp, - max_episodes_per_task=args.max_episodes_per_task, - vcodec=args.vcodec, - job_name=args.job_name, - logs_dir=args.logs_dir, - workers=args.workers, - partition=args.partition, - cpus_per_task=args.cpus_per_task, - mem_per_cpu=args.mem_per_cpu, - time_limit=args.time, - slurm=args.slurm == 1, - ) - if args.mode == "prepare": - prepare_executor.run() - - if args.mode in {"all", "aggregate"}: - hub_tags = ["lerobot", "robocasa", "unified", args.split, args.source] - if not args.tasks and args.task_set != "all": - hub_tags.append(args.task_set) - aggregate_executor = make_aggregate_executor( - output_repo_id=output_repo_id, - shard_roots=shard_roots, - output_root=output_root, - push=not args.no_push, - overwrite=args.overwrite, - job_name=f"{args.job_name}_aggregate", - logs_dir=args.logs_dir, - partition=args.partition, - cpus_per_task=args.cpus_per_task, - mem_per_cpu=args.mem_per_cpu, - time_limit=args.time, - slurm=args.slurm == 1, - hub_tags=hub_tags, - depends=prepare_executor if args.mode == "all" and args.slurm == 1 else None, - ) - if args.mode == "all" and args.slurm == 1: - # SLURM: submitting the aggregate executor with depends=prepare_executor - # transitively submits prepare too, with the right --dependency=afterok. - aggregate_executor.run() - elif args.mode == "all": - # Local: run sequentially. - assert prepare_executor is not None - prepare_executor.run() - aggregate_executor.run() - else: - aggregate_executor.run() - - -if __name__ == "__main__": - main() diff --git a/scripts/build_episode_filter.py b/scripts/build_episode_filter.py deleted file mode 100644 index db1d16304..000000000 --- a/scripts/build_episode_filter.py +++ /dev/null @@ -1,162 +0,0 @@ -#!/usr/bin/env python -# Copyright 2026 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Emit the ``--dataset.episodes`` include-list for a LeRobot dataset, minus a -set of excluded episode indices. - -``LeRobotDatasetConfig.episodes`` is an *include* list (train only on the listed -episodes), so "exclude episode X" means "pass every episode except X". This -helper builds that complement. - -For ``pepijn223/robocasa_pretrain_human300_v4`` the default exclusion set is the -63 episodes that carry NO ``subtask`` annotation (in fact no persistent language -rows at all) — see the scan in this PR's discussion. Training the steerable -SmolVLA/pi052 policy on those episodes would feed it frames with empty subtask -targets, so we drop them. - -Usage (prints a compact ``[0,1,2,...]`` list to stdout, logs to stderr): - - python scripts/build_episode_filter.py \ - --repo-id pepijn223/robocasa_pretrain_human300_v4 - - # capture in a shell script - EPISODES=$(python scripts/build_episode_filter.py --repo-id ) - lerobot-train ... --dataset.episodes="$EPISODES" - -The helper reads ``meta/info.json`` from the Hub to learn ``total_episodes`` and -validates that every excluded index is in ``[0, total_episodes)`` before emitting -the complement. Pass ``--no-validate-hub`` to skip the network round-trip and use -``--total-episodes`` directly (e.g. for an offline / local dataset). -""" - -from __future__ import annotations - -import argparse -import json -import sys - -# Episodes in pepijn223/robocasa_pretrain_human300_v4 with no `subtask` -# annotation (no persistent language rows at all). 63 episodes / 179,009 frames. -DEFAULT_EXCLUDE: tuple[int, ...] = ( - 1065, 2972, 6971, 8129, 9167, 9170, 9171, 9177, 9190, 9196, 9199, 9204, - 9207, 9208, 9210, 9217, 9232, 9234, 9240, 9243, 9254, 9256, 9258, 9259, - 9261, 9263, 9264, 15928, 16350, 18729, 20026, 21703, 25314, 25319, 25321, - 25324, 25333, 25340, 25356, 25366, 25374, 25388, 25392, 25825, 25893, - 26347, 26357, 26374, 26375, 26388, 26394, 26398, 26400, 26409, 26422, - 26423, 26426, 26895, 26905, 26915, 26954, 27064, 30812, -) - - -def _log(msg: str) -> None: - print(msg, file=sys.stderr, flush=True) - - -def _total_episodes_from_hub(repo_id: str, revision: str | None) -> int: - """Return ``total_episodes`` from the dataset's ``meta/info.json`` on the Hub.""" - from huggingface_hub import hf_hub_download - - path = hf_hub_download( - repo_id=repo_id, - filename="meta/info.json", - repo_type="dataset", - revision=revision, - ) - with open(path) as f: - info = json.load(f) - total = int(info["total_episodes"]) - if total <= 0: - raise ValueError(f"info.json reports non-positive total_episodes={total!r}") - return total - - -def build_include_list(total_episodes: int, exclude: set[int]) -> list[int]: - """Return ``[0, total_episodes)`` with ``exclude`` removed, ascending.""" - out_of_range = sorted(e for e in exclude if e < 0 or e >= total_episodes) - if out_of_range: - raise ValueError( - f"{len(out_of_range)} excluded index(es) outside [0, {total_episodes}): " - f"{out_of_range[:10]}{'...' if len(out_of_range) > 10 else ''}. " - "The dataset may have changed — re-run the subtask scan before training." - ) - return [e for e in range(total_episodes) if e not in exclude] - - -def main() -> int: - p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) - p.add_argument("--repo-id", default="pepijn223/robocasa_pretrain_human300_v4") - p.add_argument("--revision", default=None, help="Dataset revision/branch (default: main).") - p.add_argument( - "--exclude-file", - default=None, - help="Optional JSON file with a list of episode indices to exclude. " - "Overrides the built-in default set.", - ) - p.add_argument( - "--total-episodes", - type=int, - default=None, - help="Total episode count. If omitted, read from meta/info.json on the Hub.", - ) - p.add_argument( - "--no-validate-hub", - action="store_true", - help="Do not fetch info.json from the Hub; requires --total-episodes.", - ) - p.add_argument( - "--out", - default=None, - help="Write the list to this file instead of stdout.", - ) - args = p.parse_args() - - if args.exclude_file: - with open(args.exclude_file) as f: - data = json.load(f) - # Accept either a bare list or the {"missing_episode_indices": [...]} report shape. - exclude = set(data["missing_episode_indices"] if isinstance(data, dict) else data) - else: - exclude = set(DEFAULT_EXCLUDE) - - if args.total_episodes is not None: - total = args.total_episodes - if not args.no_validate_hub: - hub_total = _total_episodes_from_hub(args.repo_id, args.revision) - if hub_total != total: - raise ValueError( - f"--total-episodes={total} disagrees with Hub info.json total_episodes={hub_total}." - ) - else: - if args.no_validate_hub: - raise SystemExit("--no-validate-hub requires --total-episodes.") - total = _total_episodes_from_hub(args.repo_id, args.revision) - - include = build_include_list(total, exclude) - _log( - f"[build_episode_filter] repo={args.repo_id} total={total} " - f"excluded={len(exclude)} kept={len(include)}" - ) - - # Compact JSON (no spaces) so the resulting CLI arg stays as short as possible. - payload = "[" + ",".join(map(str, include)) + "]" - if args.out: - with open(args.out, "w") as f: - f.write(payload) - _log(f"[build_episode_filter] wrote {len(payload)} bytes to {args.out}") - else: - sys.stdout.write(payload) - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/scripts/build_robocasa_smoke.sh b/scripts/build_robocasa_smoke.sh deleted file mode 100755 index e36b5f09f..000000000 --- a/scripts/build_robocasa_smoke.sh +++ /dev/null @@ -1,47 +0,0 @@ -#!/bin/bash -# Build a tiny RoboCasa smoke dataset (2 short atomic tasks, all episodes) for -# fast end-to-end training validation before the real run. -# -# Defaults: target/human, OpenStandMixerHead + NavigateKitchen (~1k episodes, -# ~131k frames, ~109 min @ 20 fps), 2 SLURM workers on hopper-cpu. -# -# Override via env: TASKS, REPO_ID, WORK_DIR, WORKERS, CPUS, PARTITION, LOCAL=1. - -set -euo pipefail - -cd "${LEROBOT_ROOT:-$HOME/lerobot}" -source ~/miniconda3/etc/profile.d/conda.sh -conda activate lerobot - -REPO_ID="${REPO_ID:-${HF_USER:?HF_USER is unset}/robocasa_smoke_2atomic_v3}" -WORK_DIR="${WORK_DIR:-/fsx/${USER}/robocasa/datasets/v1.0}" -ROBOCASA_ROOT="${ROBOCASA_ROOT:-/fsx/${USER}/robocasa}" -LOGS_DIR="${LOGS_DIR:-/fsx/${USER}/logs/robocasa}" -TASKS="${TASKS:-OpenStandMixerHead NavigateKitchen}" -WORKERS="${WORKERS:-2}" -CPUS="${CPUS:-8}" -PARTITION="${PARTITION:-hopper-cpu}" -LOCAL="${LOCAL:-0}" - -ARGS=( - examples/port_datasets/slurm_build_robocasa_composite_seen.py - --repo-id="$REPO_ID" - --work-dir="$WORK_DIR" - --robocasa-root="$ROBOCASA_ROOT" - --split=target --source=human - --tasks $TASKS - --workers="$WORKERS" - --cpus-per-task="$CPUS" - --partition="$PARTITION" - --mem-per-cpu=4G - --time=04:00:00 - --logs-dir="$LOGS_DIR" - --job-name=port_robocasa_smoke -) -if [[ "$LOCAL" == "1" ]]; then - ARGS+=(--slurm=0) -fi - -echo "Smoke dataset: $REPO_ID" -echo "Tasks: $TASKS" -python "${ARGS[@]}" diff --git a/scripts/train_pi052_human300_exclude_unannotated.sh b/scripts/train_pi052_human300_exclude_unannotated.sh deleted file mode 100755 index ca03c2e16..000000000 --- a/scripts/train_pi052_human300_exclude_unannotated.sh +++ /dev/null @@ -1,115 +0,0 @@ -#!/bin/bash -#SBATCH --job-name=pi052-hirobot-robocasa-human300 -#SBATCH --partition=hopper-prod -#SBATCH --qos=high -#SBATCH --time=48:00:00 -#SBATCH --ntasks=1 -#SBATCH --gpus-per-task=8 - -set -euo pipefail - -cd "${LEROBOT_ROOT:-$HOME/lerobot}" - -export LEROBOT_DEBUG_PREDS_EVERY=1000 -export PATH="$HOME/miniconda3/bin:$HOME/.local/bin:$PATH" -export LD_LIBRARY_PATH="$HOME/miniconda3/lib:${LD_LIBRARY_PATH:-}" -export NCCL_TIMEOUT="${NCCL_TIMEOUT:-1800}" -export HF_HUB_DOWNLOAD_TIMEOUT="${HF_HUB_DOWNLOAD_TIMEOUT:-120}" -export WANDB_INIT_TIMEOUT="${WANDB_INIT_TIMEOUT:-300}" -export PYTORCH_CUDA_ALLOC_CONF="${PYTORCH_CUDA_ALLOC_CONF:-expandable_segments:True}" -# Compile path: pin triton + inductor caches node-local. The shared -# /fsx cache mixes kernels built against different glibc versions and -# trips ``GLIBC_2.34 not found`` on hopper nodes (bench v3 confirmed). -export TRITON_CACHE_DIR="/tmp/triton_${SLURM_JOB_ID}" -export TORCHINDUCTOR_CACHE_DIR="/tmp/torchinductor_${SLURM_JOB_ID}" -mkdir -p "$TRITON_CACHE_DIR" "$TORCHINDUCTOR_CACHE_DIR" - -# Non-fatal so an unstaged local hotfix doesn't kill the job. CI / clean -# checkouts still fast-forward as before; dirty trees just keep their -# in-flight changes (the working tree is what runs). -git pull --ff-only || echo "[warn] git pull skipped — keeping working tree." -python -m pip install -q --upgrade -e . -python -m pip install -q --upgrade -e '.[pi]' -python -m pip install -q --upgrade 'liger-kernel' - -# FlashAttention-2 is NOT installed. The pi052 dual-expert layer compute -# uses SDPA (the block-bidirectional mask is unsupported by FA2 anyway), -# and the only other consumer would be liger-kernel — which gracefully -# degrades when flash_attn is absent. The previously-installed wheel was -# built against a newer GLIBC than some hopper compute nodes provide -# (job 22162586 on ip-26-0-162-14 hit ``GLIBC_2.32 not found``), so the -# safest configuration is "not installed". To re-enable for the -# downstream HF Gemma ``generate`` path, install a wheel matching the -# node's libc — but verify on every assigned node first. - -DATASET="pepijn223/robocasa_pretrain_human300_v4" -DATASET_REVISION="${DATASET_REVISION:-main}" -POLICY_REPO_ID="pepijn223/pi052_robocasa_human300" -JOB_NAME="pi052-hirobot-robocasa-human300" -NUM_PROCESSES=8 -# BS=36 — fits ~72 GB / 80 GB, BS=36 × 8 GPUs = 288 effective. -BATCH_SIZE=${BATCH_SIZE:-36} -STEPS=${STEPS:-5000} -RUN_ID="${SLURM_JOB_ID:-$(date +%Y%m%d_%H%M%S)}" -OUTPUT_DIR="/fsx/pepijn/outputs/train/pi052_robocasa_human300_${RUN_ID}" - -# --- Exclude un-annotated episodes ----------------------------------------- -# 63 episodes in this dataset carry NO `subtask` annotation (no persistent -# language rows at all). `--dataset.episodes` is an INCLUDE list, so we pass -# the complement: every episode index except those 63. The helper reads -# meta/info.json from the Hub to confirm total_episodes (32043) and validates -# the excluded indices are in range before emitting the list. If the dataset -# version changes such that the indices fall out of range, the helper aborts -# the job rather than silently training on the wrong episodes. -echo "Building episode include-list (excluding un-annotated episodes)..." -EPISODES=$(python scripts/build_episode_filter.py \ - --repo-id "$DATASET" \ - --revision "$DATASET_REVISION") - -echo "Training pi052 on $DATASET with ${NUM_PROCESSES} GPUs, batch size ${BATCH_SIZE}/GPU, ${STEPS} steps" -echo "Output directory: $OUTPUT_DIR" -export LEROBOT_DUMP_RECIPE_SAMPLES=8 - -accelerate launch --multi_gpu --num_processes="$NUM_PROCESSES" \ - -m lerobot.scripts.lerobot_train \ - --policy.type=pi052 \ - --policy.pretrained_path=lerobot/pi05_base \ - --policy.recipe_path=recipes/subtask_mem_vqa_robocasa.yaml \ - --dataset.repo_id="$DATASET" \ - --dataset.revision="$DATASET_REVISION" \ - --dataset.episodes="$EPISODES" \ - --dataset.video_backend=pyav \ - --output_dir="$OUTPUT_DIR" \ - --job_name="$JOB_NAME" \ - --policy.repo_id="$POLICY_REPO_ID" \ - --policy.compile_model=true \ - --policy.compile_mode=default \ - --policy.gradient_checkpointing=true \ - --policy.device=cuda \ - --policy.tokenizer_max_length=256 \ - --policy.action_tokenizer_name=lerobot/fast-action-tokenizer \ - --policy.chunk_size=30 \ - --policy.n_action_steps=30 \ - --policy.max_action_tokens=256 \ - --steps="$STEPS" \ - --policy.scheduler_decay_steps="$STEPS" \ - --batch_size="$BATCH_SIZE" \ - --wandb.enable=true \ - --policy.dtype=bfloat16 \ - --policy.optimizer_lr=5e-5 \ - --policy.optimizer_grad_clip_norm=1.0 \ - --policy.scheduler_decay_lr=5e-6 \ - --policy.lm_head_lr_scale=5.0 \ - --ema.enable=true \ - --wandb.disable_artifact=true \ - --wandb.project=hirobot \ - --log_freq=100 \ - --save_freq=5000 \ - --num_workers=4 \ - --prefetch_factor=4 \ - --persistent_workers=true \ - --dataset.image_transforms.enable=true \ - --dataset.image_transforms.max_num_transforms=3 \ - --dataset.image_transforms.random_order=true \ - --policy.auto_fit_fast_tokenizer=true \ - --policy.knowledge_insulation=true diff --git a/src/lerobot/scripts/build_robocasa_composite_seen.py b/src/lerobot/scripts/build_robocasa_composite_seen.py deleted file mode 100644 index 811c06392..000000000 --- a/src/lerobot/scripts/build_robocasa_composite_seen.py +++ /dev/null @@ -1,345 +0,0 @@ -#!/usr/bin/env python3 -"""Build a single combined LeRobotDataset from RoboCasa's 16 composite_seen tasks. - -RoboCasa 1.0 already ships in LeRobot format (parquet + mp4), distributed as -``lerobot.tar`` archives from Box. This script: - -1. Downloads each composite_seen task's ``target/human`` archive via RoboCasa's - official ``download_datasets`` helper (idempotent — skipped if already on - disk). -2. Opens each extracted directory as a ``LeRobotDataset``. -3. Merges all 16 into one unified dataset via ``merge_datasets`` (a thin wrapper - over ``aggregate_datasets`` that revalidates fps / robot_type / features, - unifies task indices, concatenates videos and parquet, and recomputes stats). -4. Optionally pushes the merged dataset to the Hub. - -The result is one ~8,000-trajectory dataset where each episode carries its -source task as the ``task`` field — ready for downstream annotation -(subtasks / memory / VQA / tool calls) without per-task bookkeeping. - -Usage:: - - uv run python -m lerobot.scripts.build_robocasa_composite_seen \\ - --output-dir=/data/lerobot/robocasa_composite_seen \\ - --hub-repo-id=${HF_USER}/robocasa_composite_seen \\ - --push-to-hub - -Prereqs: ``robocasa`` and ``robosuite`` installed (see -``docs/source/benchmarks/robocasa.mdx`` for the editable-install dance — they -are not on PyPI and RoboCasa's own ``setup.py`` pins an old LeRobot version). - -The 16 composite_seen tasks are the multi-step subset of the official -RoboCasa365 target benchmark — exactly the slice used to compute the -``Composite-Seen`` column of the leaderboard. -""" - -from __future__ import annotations - -import argparse -import logging -import sys -from pathlib import Path - -from lerobot.datasets.dataset_tools import merge_datasets -from lerobot.datasets.lerobot_dataset import LeRobotDataset - -logger = logging.getLogger(__name__) - -# Canonical 16 composite_seen tasks (RoboCasa365 target benchmark). -# Order matches the leaderboard docs. -COMPOSITE_SEEN_TASKS: list[str] = [ - "DeliverStraw", - "GetToastedBread", - "KettleBoiling", - "LoadDishwasher", - "PackIdenticalLunches", - "PreSoakPan", - "PrepareCoffee", - "RinseSinkBasin", - "ScrubCuttingBoard", - "SearingMeat", - "SetUpCuttingStation", - "StackBowlsCabinet", - "SteamInMicrowave", - "StirVegetables", - "StoreLeftoversInBowl", - "WashLettuce", -] - - -def _require_robocasa() -> None: - """Fail fast with an actionable message if robocasa is missing. - - RoboCasa is not on PyPI and is not a LeRobot extra — see the installation - notes in ``docs/source/benchmarks/robocasa.mdx``. - """ - try: - import robocasa # noqa: F401, PLC0415 - from robocasa.scripts import download_datasets as _dl # noqa: F401, PLC0415 - from robocasa.utils import dataset_registry as _reg # noqa: F401, PLC0415 - except ImportError as exc: - sys.exit( - "[build_robocasa_composite_seen] robocasa is not importable.\n" - "Install it (and robosuite) per the LeRobot RoboCasa docs:\n" - " git clone https://github.com/robocasa/robocasa.git ~/robocasa\n" - " git clone https://github.com/ARISE-Initiative/robosuite.git ~/robosuite\n" - " pip install -e ~/robocasa --no-deps\n" - " pip install -e ~/robosuite\n" - f"(original error: {exc})" - ) - - -def _resolve_task_root(task: str) -> Path: - """Resolve the local extracted ``LeRobotDataset`` root for a target/human task. - - Uses RoboCasa's own ``dataset_registry`` so we follow whatever directory - layout RoboCasa picks (currently ``v1.0/target/composite///`` - under ``robocasa.macros.DATASET_BASE_DIR``). Falls back to discovering the - extracted directory if the helper's signature drifted between releases. - """ - from robocasa.utils import dataset_registry # noqa: PLC0415 - - # ``get_ds_path`` is the canonical helper. RoboCasa 1.0 signature is - # ``get_ds_path(task, ds_type, return_info=False)`` with ``ds_type`` like - # ``"human_im"`` (image-observation human demos). We try the common - # ``split=`` kwarg first (newer registry); if it's rejected, fall back. - try: - ds_path = dataset_registry.get_ds_path( - task=task, - ds_type="human_im", - return_info=False, - split="target", - ) - except TypeError: - # Older registry — ds_type alone disambiguates target/human. - ds_path = dataset_registry.get_ds_path( - task=task, - ds_type="human_im", - return_info=False, - ) - - root = Path(ds_path) - # ``get_ds_path`` may return either the extracted dir or the .tar; normalize. - if root.suffix == ".tar": - root = root.parent - return root - - -def _download_task(task: str, *, overwrite: bool = False) -> Path: - """Download (or locate) a single target/human task and return its extracted root.""" - from robocasa.scripts import download_datasets as dl # noqa: PLC0415 - - # Try the documented programmatic API. The CLI is - # python -m robocasa.scripts.download_datasets --tasks --source human --split target - # which is a thin wrapper over a function of the same name. - if hasattr(dl, "download_datasets"): - try: - dl.download_datasets( - tasks=[task], - source="human", - split="target", - overwrite=overwrite, - ) - except TypeError: - # Older signature — drop the kwargs RoboCasa didn't have yet. - dl.download_datasets(tasks=[task]) - else: - # No public function — shell out to the CLI as a last resort. This - # guarantees we use whatever entrypoint RoboCasa's authors maintain. - import subprocess # noqa: PLC0415 - - cmd = [ - sys.executable, - "-m", - "robocasa.scripts.download_datasets", - "--tasks", - task, - "--source", - "human", - "--split", - "target", - ] - if overwrite: - cmd.append("--overwrite") - subprocess.run(cmd, check=True) - - root = _resolve_task_root(task) - if not root.exists(): - raise RuntimeError( - f"Expected {root} after download, but it doesn't exist. " - "RoboCasa may have changed its data layout — verify with " - "`robocasa.utils.dataset_registry.get_ds_path()`." - ) - return root - - -def _open_as_lerobot_dataset(task: str, root: Path) -> LeRobotDataset: - """Open an extracted RoboCasa target/human task as a ``LeRobotDataset``. - - The placeholder ``repo_id`` (``robocasa/_target_human``) is only used - by the aggregator for logging and for the unified task table — the actual - data is loaded from ``root``. - """ - repo_id = f"robocasa/{task}_target_human" - return LeRobotDataset(repo_id=repo_id, root=root) - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser( - description="Aggregate the 16 RoboCasa composite_seen target tasks into one LeRobotDataset.", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=__doc__, - ) - parser.add_argument( - "--output-dir", - type=Path, - required=True, - help="Local directory for the merged dataset (will be created).", - ) - parser.add_argument( - "--hub-repo-id", - type=str, - default=None, - help=( - "Hub repo_id for the merged dataset (e.g. ``yourname/" - "robocasa_composite_seen``). Required for ``--push-to-hub``; also " - "becomes the merged dataset's canonical ``repo_id``." - ), - ) - parser.add_argument( - "--push-to-hub", - action="store_true", - help="Push the merged dataset to the Hub after building. Requires " - "``--hub-repo-id`` and a prior ``huggingface-cli login``.", - ) - parser.add_argument( - "--private", - action="store_true", - help="When pushing, create the Hub repo as private.", - ) - parser.add_argument( - "--tasks", - type=str, - default=None, - help="Comma-separated task names to override the default 16 " - "composite_seen list (useful for smoke-testing with 1–2 tasks).", - ) - parser.add_argument( - "--skip-download", - action="store_true", - help="Skip the download step entirely; assume each task is already " - "extracted on disk at the path ``dataset_registry.get_ds_path`` " - "returns.", - ) - parser.add_argument( - "--overwrite-download", - action="store_true", - help="Force re-download even when a complete local extraction exists.", - ) - parser.add_argument( - "--log-level", - type=str, - default="INFO", - choices=["DEBUG", "INFO", "WARNING", "ERROR"], - ) - return parser.parse_args() - - -def main() -> int: - args = parse_args() - logging.basicConfig( - level=getattr(logging, args.log_level), - format="[%(levelname)s] %(message)s", - ) - - tasks = ( - [t.strip() for t in args.tasks.split(",") if t.strip()] - if args.tasks - else list(COMPOSITE_SEEN_TASKS) - ) - if not tasks: - sys.exit("No tasks selected.") - - if args.push_to_hub and not args.hub_repo_id: - sys.exit("--push-to-hub requires --hub-repo-id.") - - output_repo_id = args.hub_repo_id or "local/robocasa_composite_seen" - logger.info( - "Building merged RoboCasa dataset: %d tasks → %s (output dir: %s)", - len(tasks), - output_repo_id, - args.output_dir, - ) - - _require_robocasa() - - # 1. Download (or locate) each task's extracted directory. - task_roots: list[tuple[str, Path]] = [] - for i, task in enumerate(tasks, 1): - logger.info("[%d/%d] %s", i, len(tasks), task) - if args.skip_download: - root = _resolve_task_root(task) - if not root.exists(): - sys.exit( - f"--skip-download set but extracted directory does not " - f"exist for {task}: {root}" - ) - else: - root = _download_task(task, overwrite=args.overwrite_download) - logger.info(" extracted at: %s", root) - task_roots.append((task, root)) - - # 2. Open each as a LeRobotDataset (validation happens inside aggregator). - datasets: list[LeRobotDataset] = [] - for task, root in task_roots: - logger.info("Opening %s", task) - ds = _open_as_lerobot_dataset(task, root) - logger.info( - " %s: %d episodes, %d frames, %d FPS", - task, - ds.num_episodes, - ds.num_frames, - ds.fps, - ) - datasets.append(ds) - - # 3. Merge — re-validates features/fps/robot_type, unifies tasks, concats - # videos + parquet, recomputes stats. - logger.info("Merging %d datasets into %s", len(datasets), output_repo_id) - merged = merge_datasets( - datasets=datasets, - output_repo_id=output_repo_id, - output_dir=args.output_dir, - ) - logger.info( - "Merged: %d episodes, %d frames across %d unique task strings", - merged.num_episodes, - merged.num_frames, - len(merged.meta.tasks) if merged.meta.tasks is not None else 0, - ) - - # 4. Push to Hub. - if args.push_to_hub: - logger.info("Pushing %s to the Hub (private=%s)", args.hub_repo_id, args.private) - # ``upload_large_folder=True`` is the right mode for tens-of-GB - # datasets — uses multipart uploads + resumable transfers. - merged.push_to_hub( - private=args.private, - upload_large_folder=True, - tags=["lerobot", "robocasa", "composite_seen", "manipulation"], - ) - logger.info( - "Push complete: https://huggingface.co/datasets/%s", - args.hub_repo_id, - ) - else: - logger.info( - "Skipping Hub push (no --push-to-hub). Merged dataset is at %s.", - args.output_dir, - ) - - return 0 - - -if __name__ == "__main__": - raise SystemExit(main())