incorporate wallx model into lerobot

This commit is contained in:
Geoffrey19
2025-12-02 10:46:13 +08:00
committed by Michel Aractingi
parent 2cf509795e
commit a8e7a2967c
7 changed files with 6136 additions and 105 deletions
+21
View File
@@ -0,0 +1,21 @@
#!/usr/bin/env python
# Copyright 2025 Physical Intelligence and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .configuration_wall_x import WallXConfig
from .modeling_wall_x import WallXPolicy
from .processor_wall_x import make_wall_x_pre_post_processors
__all__ = ["WallXConfig", "WallXPolicy", "make_wall_x_pre_post_processors"]
@@ -13,6 +13,7 @@
# limitations under the License.
from dataclasses import dataclass, field
from typing import Any
from lerobot.configs.policies import PreTrainedConfig
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
@@ -29,8 +30,48 @@ class WallXConfig(PreTrainedConfig):
Wall-X is based on Qwen2.5-VL with action prediction capabilities using flow matching.
It supports cross-embodiment robotic control through unified action representations.
This config supports multi-modal learning with vision, language, and action data.
"""
# Input / output structure
# ==================== Model and Paths Configuration ====================
# Logging
log_name: str = "wall_x_training"
log_project: str = "vla_training"
model_type: str = "wall-oss"
# Pretrained model paths
pretrained_wallx_path: str | None = None # Path to pretrained Wall-X model
save_path: str | None = None # Path to save checkpoints
processor_path: str | None = None # Path to processor (defaults to pretrained_wallx_path)
action_tokenizer_path: str | None = None # Path to action tokenizer (for FAST mode)
# Tokenizer settings
use_fast_tokenizer: bool = False # True: train FAST, False: train Flow
# ==================== Profiling Configuration ====================
profile: bool = False
profile_save_path: str | None = None
profile_wait_iters: int = 10
profile_warmup_iters: int = 5
profile_active_iters: int = 2
# ==================== Training Hyperparameters ====================
num_warmup_steps: int = 100
num_training_steps: int = 64000000
learning_rate: float = 5e-5
min_lr: float = 5e-5
num_epoch: int = 100
gradient_accumulation_steps: int = 32
batch_size_per_gpu: int = 8
padding_side: str = "left"
epoch_save_interval: int = 10
# Training optimization
fsdp2: bool = False
torch_compile: bool = False
# ==================== Input / Output Structure ====================
n_obs_steps: int = 1
chunk_size: int = 32 # action_horizon in wall-x
n_action_steps: int = 32
@@ -53,7 +94,7 @@ class WallXConfig(PreTrainedConfig):
# Tokenizer
tokenizer_max_length: int = 256
# Model architecture
# ==================== Model Architecture ====================
vlm_model_name: str = "Qwen/Qwen2.5-VL-3B-Instruct"
load_vlm_weights: bool = True
@@ -79,6 +120,7 @@ class WallXConfig(PreTrainedConfig):
num_key_value_heads: int = 4 # 8 for 7B model
vocab_size: int = 152064
# ==================== Action Prediction ====================
# Action prediction mode: "flow" or "fast"
prediction_mode: str = "flow"
@@ -93,7 +135,8 @@ class WallXConfig(PreTrainedConfig):
num_inference_timesteps: int = 10 # Number of ODE solver steps
ode_solver_method: str = "euler" # ODE solver method
# Degrees of freedom configuration - example for bimanual robot
# ==================== Robot Configuration ====================
# Degrees of freedom configuration - defines action space
dof_config: dict = field(default_factory=lambda: {
"left_ee_pos": 3,
"left_ee_rot": 3,
@@ -103,7 +146,7 @@ class WallXConfig(PreTrainedConfig):
"right_gripper": 1,
})
# Proprioception configuration (mirrors dof_config)
# Proprioception configuration (typically mirrors dof_config)
agent_pos_config: dict = field(default_factory=lambda: {
"left_ee_pos": 3,
"left_ee_rot": 3,
@@ -113,12 +156,23 @@ class WallXConfig(PreTrainedConfig):
"right_gripper": 1,
})
# MoE configuration
# Customized robot configuration
enable_customized_robot_config: bool = False
customized_robot_config: dict = field(default_factory=lambda: {
"name": "",
"customized_dof_config": {},
"customized_agent_pos_config": {},
})
# Normalization statistics path
norm_stats_path: str | None = None
# ==================== MoE Configuration ====================
num_experts: int = 4
attention_moe: bool = False
mlp_moe: bool = False
# Finetuning settings
# ==================== Finetuning Settings ====================
freeze_vision_encoder: bool = True
train_expert_only: bool = False # wall-x trains more components
train_action_head: bool = True
@@ -126,7 +180,7 @@ class WallXConfig(PreTrainedConfig):
# Cache
use_cache: bool = True
# Training presets
# ==================== Optimizer Presets ====================
optimizer_lr: float = 2e-5
optimizer_betas: tuple[float, float] = (0.9, 0.95)
optimizer_eps: float = 1e-8
@@ -137,14 +191,48 @@ class WallXConfig(PreTrainedConfig):
scheduler_decay_steps: int = 100000
scheduler_decay_lr: float = 1e-6
# ==================== Dataset Configuration ====================
# Dataset-specific normalization statistics
# Maps dataset names to {min, delta} for action normalization
action_statistics: dict = field(default_factory=dict)
# Data configuration
data_config: dict = field(default_factory=lambda: {
"use_lerobot": True,
"lerobot_config": {
"repo_id": "",
"root": None,
"episodes": None,
"image_transforms": None,
"delta_timestamps": None,
"tolerance_s": 1e-4,
"revision": None,
"force_cache_sync": False,
"download_videos": True,
"video_backend": None,
},
"action_horizon": 32,
"train_test_split": 0.95,
"obs_action_keys": [],
"predict_action_keys": [],
"resolution": {
"face_view": 256,
"left_wrist_view": 256,
"right_wrist_view": 256,
"move1_view": 256,
"move2_view": 256,
"top_view": 256,
"wall_view": 256,
"multi_modal": 256,
},
})
# ==================== Resume Configuration ====================
resume_config: dict | None = field(default_factory=lambda: None)
def __post_init__(self):
super().__post_init__()
"""Input validation"""
# Input validation
if self.n_action_steps > self.chunk_size:
raise ValueError(
f"The chunk size is the upper bound for the number of action steps per model invocation. Got "
@@ -163,6 +251,232 @@ class WallXConfig(PreTrainedConfig):
f"Total DOF ({total_dof}) exceeds max_action_dim ({self.max_action_dim})"
)
# Sync prediction_mode with use_fast_tokenizer
if self.use_fast_tokenizer:
self.prediction_mode = "fast"
else:
self.prediction_mode = "flow"
def get_train_config(self) -> dict:
"""
Extract the complete train_config dictionary matching the YAML training configuration format.
This method constructs the full train_config from WallXConfig fields, suitable for
training scripts and Qwen2_5_VLMoEForAction.from_pretrained.
Returns:
dict: Complete training configuration matching YAML structure.
"""
# Build customized_robot_config
if self.enable_customized_robot_config and self.customized_robot_config:
customized_robot_config = {
"name": self.customized_robot_config.get("name", ""),
"customized_dof_config": self.customized_robot_config.get(
"customized_dof_config", self.dof_config
),
"customized_agent_pos_config": self.customized_robot_config.get(
"customized_agent_pos_config", self.agent_pos_config
),
}
else:
customized_robot_config = {
"name": self.data_config.get("lerobot_config", {}).get("repo_id", ""),
"customized_dof_config": self.dof_config,
"customized_agent_pos_config": self.agent_pos_config,
}
train_config = {
# Model and paths configuration
"log_name": self.log_name,
"log_project": self.log_project,
"model_type": self.model_type,
"pretrained_wallx_path": self.pretrained_wallx_path,
"save_path": self.save_path,
"use_fast_tokenizer": self.use_fast_tokenizer,
"action_tokenizer_path": self.action_tokenizer_path,
# Profiling configuration
"profile": self.profile,
"profile_save_path": self.profile_save_path,
"profile_wait_iters": self.profile_wait_iters,
"profile_warmup_iters": self.profile_warmup_iters,
"profile_active_iters": self.profile_active_iters,
# Training hyperparameters
"num_warmup_steps": self.num_warmup_steps,
"num_training_steps": self.num_training_steps,
"learning_rate": self.learning_rate,
"min_lr": self.min_lr,
"num_epoch": self.num_epoch,
"gradient_accumulation_steps": self.gradient_accumulation_steps,
"batch_size_per_gpu": self.batch_size_per_gpu,
"padding_side": self.padding_side,
"epoch_save_interval": self.epoch_save_interval,
# Training optimization
"FSDP2": self.fsdp2,
"torch_compile": self.torch_compile,
# Robot configuration
"dof_config": self.dof_config,
"agent_pos_config": self.agent_pos_config,
# Normalization stats
"norm_stats_path": self.norm_stats_path,
# Customized robot config
"enable_customized_robot_config": self.enable_customized_robot_config,
"customized_robot_config": customized_robot_config,
# Resume configuration
"resume": self.resume_config,
# Data configuration
"data": self.data_config,
}
return train_config
def get_dataload_config(self) -> dict:
"""
Extract data loading configuration from config.
Returns:
dict: Data loading configuration for preprocessing.
"""
return {
"action_horizon": self.data_config.get("action_horizon", self.chunk_size),
"train_test_split": self.data_config.get("train_test_split", 0.95),
"split_seed": 42,
"predict_action_keys": self.data_config.get("predict_action_keys", []),
"obs_action_keys": self.data_config.get("obs_action_keys", []),
"resolution": self.data_config.get("resolution", {}),
"priority_order": None,
"max_length": self.tokenizer_max_length,
}
def get_lerobot_config(self) -> dict:
"""
Extract LeRobot dataset configuration.
Returns:
dict: LeRobot dataset configuration.
"""
return self.data_config.get("lerobot_config", {})
@classmethod
def from_yaml_dict(cls, yaml_dict: dict) -> "WallXConfig":
"""
Create a WallXConfig from a YAML configuration dictionary.
Args:
yaml_dict: Dictionary loaded from YAML training config file.
Returns:
WallXConfig instance with values from YAML.
"""
config_kwargs = {}
# Model and paths
if "log_name" in yaml_dict:
config_kwargs["log_name"] = yaml_dict["log_name"]
if "log_project" in yaml_dict:
config_kwargs["log_project"] = yaml_dict["log_project"]
if "model_type" in yaml_dict:
config_kwargs["model_type"] = yaml_dict["model_type"]
if "pretrained_wallx_path" in yaml_dict:
config_kwargs["pretrained_wallx_path"] = yaml_dict["pretrained_wallx_path"]
if "save_path" in yaml_dict:
config_kwargs["save_path"] = yaml_dict["save_path"]
if "use_fast_tokenizer" in yaml_dict:
config_kwargs["use_fast_tokenizer"] = yaml_dict["use_fast_tokenizer"]
if "action_tokenizer_path" in yaml_dict:
config_kwargs["action_tokenizer_path"] = yaml_dict["action_tokenizer_path"]
# Profiling
if "profile" in yaml_dict:
config_kwargs["profile"] = yaml_dict["profile"]
if "profile_save_path" in yaml_dict:
config_kwargs["profile_save_path"] = yaml_dict["profile_save_path"]
if "profile_wait_iters" in yaml_dict:
config_kwargs["profile_wait_iters"] = yaml_dict["profile_wait_iters"]
if "profile_warmup_iters" in yaml_dict:
config_kwargs["profile_warmup_iters"] = yaml_dict["profile_warmup_iters"]
if "profile_active_iters" in yaml_dict:
config_kwargs["profile_active_iters"] = yaml_dict["profile_active_iters"]
# Training hyperparameters
if "num_warmup_steps" in yaml_dict:
config_kwargs["num_warmup_steps"] = yaml_dict["num_warmup_steps"]
config_kwargs["scheduler_warmup_steps"] = yaml_dict["num_warmup_steps"]
if "num_training_steps" in yaml_dict:
config_kwargs["num_training_steps"] = yaml_dict["num_training_steps"]
config_kwargs["scheduler_decay_steps"] = yaml_dict["num_training_steps"]
if "learning_rate" in yaml_dict:
config_kwargs["learning_rate"] = yaml_dict["learning_rate"]
config_kwargs["optimizer_lr"] = yaml_dict["learning_rate"]
if "min_lr" in yaml_dict:
config_kwargs["min_lr"] = yaml_dict["min_lr"]
config_kwargs["scheduler_decay_lr"] = yaml_dict["min_lr"]
if "num_epoch" in yaml_dict:
config_kwargs["num_epoch"] = yaml_dict["num_epoch"]
if "gradient_accumulation_steps" in yaml_dict:
config_kwargs["gradient_accumulation_steps"] = yaml_dict["gradient_accumulation_steps"]
if "batch_size_per_gpu" in yaml_dict:
config_kwargs["batch_size_per_gpu"] = yaml_dict["batch_size_per_gpu"]
if "padding_side" in yaml_dict:
config_kwargs["padding_side"] = yaml_dict["padding_side"]
if "epoch_save_interval" in yaml_dict:
config_kwargs["epoch_save_interval"] = yaml_dict["epoch_save_interval"]
# Training optimization
if "FSDP2" in yaml_dict:
config_kwargs["fsdp2"] = yaml_dict["FSDP2"]
if "torch_compile" in yaml_dict:
config_kwargs["torch_compile"] = yaml_dict["torch_compile"]
# Robot configuration
if "dof_config" in yaml_dict:
config_kwargs["dof_config"] = yaml_dict["dof_config"]
if "agent_pos_config" in yaml_dict:
config_kwargs["agent_pos_config"] = yaml_dict["agent_pos_config"]
# Normalization stats
if "norm_stats_path" in yaml_dict:
config_kwargs["norm_stats_path"] = yaml_dict["norm_stats_path"]
# Customized robot config
if "enable_customized_robot_config" in yaml_dict:
config_kwargs["enable_customized_robot_config"] = yaml_dict["enable_customized_robot_config"]
if "customized_robot_config" in yaml_dict:
config_kwargs["customized_robot_config"] = yaml_dict["customized_robot_config"]
# Resume config
if "resume" in yaml_dict:
config_kwargs["resume_config"] = yaml_dict["resume"]
# Data configuration
if "data" in yaml_dict:
data = yaml_dict["data"]
data_config = {
"use_lerobot": data.get("use_lerobot", True),
"action_horizon": data.get("action_horizon", 32),
"train_test_split": data.get("train_test_split", 0.95),
"obs_action_keys": data.get("obs_action_keys", []),
"predict_action_keys": data.get("predict_action_keys", []),
"resolution": data.get("resolution", {}),
}
if "lerobot_config" in data:
data_config["lerobot_config"] = data["lerobot_config"]
config_kwargs["data_config"] = data_config
# Set chunk_size from action_horizon
if "action_horizon" in data:
config_kwargs["chunk_size"] = data["action_horizon"]
config_kwargs["n_action_steps"] = data["action_horizon"]
return cls(**config_kwargs)
def get_optimizer_preset(self) -> AdamWConfig:
return AdamWConfig(
lr=self.optimizer_lr,
+37
View File
@@ -0,0 +1,37 @@
#!/usr/bin/env python
# Copyright 2025 HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Wall-X Constants and Configuration Data.
Contains dataset names, key mappings, frequency mappings, and action statistics
for cross-embodiment robotic control.
"""
from pathlib import Path
# Add wall-x repo to path if available
WALL_X_PATH = Path("/x2robot_v2/vincent/workspace/lerobot_opensource/wall-x")
CAMERA_NAME_MAPPING = {
"face_view": "front view",
"left_wrist_view": "left wrist view",
"right_wrist_view": "right wrist view",
"move1_view": "move view",
"move2_view": "move view",
"wall_view": "wall view",
"top_view": "top view",
}
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,248 @@
from transformers.configuration_utils import PretrainedConfig
from transformers.modeling_rope_utils import rope_config_validation
class Qwen2_5_VLVisionConfig(PretrainedConfig):
model_type = "qwen2_5_vl"
base_config_key = "vision_config"
def __init__(
self,
depth=32,
hidden_size=3584,
hidden_act="silu",
intermediate_size=3420,
num_heads=16,
in_channels=3,
patch_size=14,
spatial_merge_size=2,
temporal_patch_size=2,
tokens_per_second=4,
window_size=112,
out_hidden_size=3584,
fullatt_block_indexes=[7, 15, 23, 31],
**kwargs,
):
super().__init__(**kwargs)
self.depth = depth
self.hidden_size = hidden_size
self.hidden_act = hidden_act
self.intermediate_size = intermediate_size
self.num_heads = num_heads
self.in_channels = in_channels
self.patch_size = patch_size
self.spatial_merge_size = spatial_merge_size
self.temporal_patch_size = temporal_patch_size
self.tokens_per_second = tokens_per_second
self.window_size = window_size
self.fullatt_block_indexes = fullatt_block_indexes
self.out_hidden_size = out_hidden_size
class Qwen2_5_VLConfig(PretrainedConfig):
r"""
This is the configuration class to store the configuration of a [`Qwen2_5_VLModel`]. It is used to instantiate a
Qwen2-VL model according to the specified arguments, defining the model architecture. Instantiating a configuration
with the defaults will yield a similar configuration to that of
Qwen2-VL-7B-Instruct [Qwen/Qwen2-VL-7B-Instruct](https://huggingface.co/Qwen/Qwen2-VL-7B-Instruct).
Configuration objects inherit from [`PretrainedConfig`] and can be used to control the model outputs. Read the
documentation from [`PretrainedConfig`] for more information.
Args:
vocab_size (`int`, *optional*, defaults to 152064):
Vocabulary size of the Qwen2_5_VL model. Defines the number of different tokens that can be represented by the
`inputs_ids` passed when calling [`Qwen2_5_VLModel`]
hidden_size (`int`, *optional*, defaults to 8192):
Dimension of the hidden representations.
intermediate_size (`int`, *optional*, defaults to 29568):
Dimension of the MLP representations.
num_hidden_layers (`int`, *optional*, defaults to 80):
Number of hidden layers in the Transformer encoder.
num_attention_heads (`int`, *optional*, defaults to 64):
Number of attention heads for each attention layer in the Transformer encoder.
num_key_value_heads (`int`, *optional*, defaults to 8):
This is the number of key_value heads that should be used to implement Grouped Query Attention. If
`num_key_value_heads=num_attention_heads`, the model will use Multi Head Attention (MHA), if
`num_key_value_heads=1` the model will use Multi Query Attention (MQA) otherwise GQA is used. When
converting a multi-head checkpoint to a GQA checkpoint, each group key and value head should be constructed
by meanpooling all the original heads within that group. For more details checkout [this
paper](https://arxiv.org/pdf/2305.13245.pdf). If it is not specified, will default to `32`.
hidden_act (`str` or `function`, *optional*, defaults to `"silu"`):
The non-linear activation function (function or string) in the decoder.
max_position_embeddings (`int`, *optional*, defaults to 32768):
The maximum sequence length that this model might ever be used with.
initializer_range (`float`, *optional*, defaults to 0.02):
The standard deviation of the truncated_normal_initializer for initializing all weight matrices.
rms_norm_eps (`float`, *optional*, defaults to 1e-05):
The epsilon used by the rms normalization layers.
use_cache (`bool`, *optional*, defaults to `True`):
Whether or not the model should return the last key/values attentions (not used by all models). Only
relevant if `config.is_decoder=True`.
tie_word_embeddings (`bool`, *optional*, defaults to `False`):
Whether the model's input and output word embeddings should be tied.
rope_theta (`float`, *optional*, defaults to 1000000.0):
The base period of the RoPE embeddings.
use_sliding_window (`bool`, *optional*, defaults to `False`):
Whether to use sliding window attention.
sliding_window (`int`, *optional*, defaults to 4096):
Sliding window attention (SWA) window size. If not specified, will default to `4096`.
max_window_layers (`int`, *optional*, defaults to 80):
The number of layers that use SWA (Sliding Window Attention). The bottom layers use SWA while the top use full attention.
attention_dropout (`float`, *optional*, defaults to 0.0):
The dropout ratio for the attention probabilities.
vision_config (`Dict`, *optional*):
The config for the visual encoder initialization.
rope_scaling (`Dict`, *optional*):
Dictionary containing the scaling configuration for the RoPE embeddings. NOTE: if you apply new rope type
and you expect the model to work on longer `max_position_embeddings`, we recommend you to update this value
accordingly.
Expected contents:
`rope_type` (`str`):
The sub-variant of RoPE to use. Can be one of ['default', 'linear', 'dynamic', 'yarn', 'longrope',
'llama3'], with 'default' being the original RoPE implementation.
`factor` (`float`, *optional*):
Used with all rope types except 'default'. The scaling factor to apply to the RoPE embeddings. In
most scaling types, a `factor` of x will enable the model to handle sequences of length x *
original maximum pre-trained length.
`original_max_position_embeddings` (`int`, *optional*):
Used with 'dynamic', 'longrope' and 'llama3'. The original max position embeddings used during
pretraining.
`attention_factor` (`float`, *optional*):
Used with 'yarn' and 'longrope'. The scaling factor to be applied on the attention
computation. If unspecified, it defaults to value recommended by the implementation, using the
`factor` field to infer the suggested value.
`beta_fast` (`float`, *optional*):
Only used with 'yarn'. Parameter to set the boundary for extrapolation (only) in the linear
ramp function. If unspecified, it defaults to 32.
`beta_slow` (`float`, *optional*):
Only used with 'yarn'. Parameter to set the boundary for interpolation (only) in the linear
ramp function. If unspecified, it defaults to 1.
`short_factor` (`List[float]`, *optional*):
Only used with 'longrope'. The scaling factor to be applied to short contexts (<
`original_max_position_embeddings`). Must be a list of numbers with the same length as the hidden
size divided by the number of attention heads divided by 2
`long_factor` (`List[float]`, *optional*):
Only used with 'longrope'. The scaling factor to be applied to long contexts (<
`original_max_position_embeddings`). Must be a list of numbers with the same length as the hidden
size divided by the number of attention heads divided by 2
`low_freq_factor` (`float`, *optional*):
Only used with 'llama3'. Scaling factor applied to low frequency components of the RoPE
`high_freq_factor` (`float`, *optional*):
Only used with 'llama3'. Scaling factor applied to high frequency components of the RoPE
```python
>>> from transformers import Qwen2_5_VLForConditionalGeneration, Qwen2_5_VLConfig
>>> # Initializing a Qwen2_5_VL style configuration
>>> configuration = Qwen2_5_VLConfig()
>>> # Initializing a model from the Qwen2-VL-7B style configuration
>>> model = Qwen2_5_VLForConditionalGeneration(configuration)
>>> # Accessing the model configuration
>>> configuration = model.config
```"""
model_type = "qwen2_5_vl"
sub_configs = {"vision_config": Qwen2_5_VLVisionConfig}
keys_to_ignore_at_inference = ["past_key_values"]
# Default tensor parallel plan for base model `Qwen2_5_VL`
base_model_tp_plan = {
"layers.*.self_attn.q_proj": "colwise",
"layers.*.self_attn.k_proj": "colwise",
"layers.*.self_attn.v_proj": "colwise",
"layers.*.self_attn.o_proj": "rowwise",
"layers.*.mlp.gate_proj": "colwise",
"layers.*.mlp.up_proj": "colwise",
"layers.*.mlp.down_proj": "rowwise",
}
base_model_pp_plan = {
"embed_tokens": (["input_ids"], ["inputs_embeds"]),
"layers": (["hidden_states", "attention_mask"], ["hidden_states"]),
"norm": (["hidden_states"], ["hidden_states"]),
}
def __init__(
self,
vocab_size=152064,
hidden_size=8192,
intermediate_size=29568,
num_hidden_layers=80,
num_attention_heads=64,
num_key_value_heads=8,
hidden_act="silu",
max_position_embeddings=32768,
initializer_range=0.02,
rms_norm_eps=1e-05,
use_cache=True,
tie_word_embeddings=False,
rope_theta=1000000.0,
use_sliding_window=False,
sliding_window=4096,
max_window_layers=80,
attention_dropout=0.0,
vision_config=None,
rope_scaling=None,
num_experts=4,
experts=None,
dof_config=None,
noise_scheduler=None,
dim_inputs=(1536, 1536),
attention_moe=False,
mlp_moe=False,
**kwargs,
):
if isinstance(vision_config, dict):
self.vision_config = self.sub_configs["vision_config"](**vision_config)
elif vision_config is None:
self.vision_config = self.sub_configs["vision_config"]()
self.vocab_size = vocab_size
self.max_position_embeddings = max_position_embeddings
self.hidden_size = hidden_size
self.intermediate_size = intermediate_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.use_sliding_window = use_sliding_window
self.sliding_window = sliding_window
self.max_window_layers = max_window_layers
# for backward compatibility
if num_key_value_heads is None:
num_key_value_heads = num_attention_heads
self.num_key_value_heads = num_key_value_heads
self.hidden_act = hidden_act
self.initializer_range = initializer_range
self.rms_norm_eps = rms_norm_eps
self.use_cache = use_cache
self.rope_theta = rope_theta
self.attention_dropout = attention_dropout
self.rope_scaling = rope_scaling
self.num_experts = num_experts
self.experts = experts
self.dof_config = dof_config
self.noise_scheduler = noise_scheduler
self.dim_inputs = tuple(dim_inputs)
self.attention_moe = attention_moe
self.mlp_moe = mlp_moe
# Validate the correctness of rotary position embeddings parameters
# BC: if there is a 'type' field, move it to 'rope_type'.
# and change type from 'mrope' to 'default' because `mrope` does defeault RoPE calculations
# one can set it to "linear"/"dynamic" etc. to have scaled RoPE
# TODO: @raushan update config in the hub
if self.rope_scaling is not None and "type" in self.rope_scaling:
if self.rope_scaling["type"] == "mrope":
self.rope_scaling["type"] = "default"
self.rope_scaling["rope_type"] = self.rope_scaling["type"]
rope_config_validation(self, ignore_keys={"mrope_section"})
super().__init__(tie_word_embeddings=tie_word_embeddings, **kwargs)
__all__ = ["Qwen2_5_VLConfig"]
File diff suppressed because it is too large Load Diff
+653
View File
@@ -0,0 +1,653 @@
#!/usr/bin/env python
# Copyright 2025 HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Wall-X Utility Functions.
Contains data processing utilities, text formatting functions, and helper classes
for the Wall-X cross-embodiment robotic control model.
"""
import json
import random
import re
from collections import OrderedDict
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple, Union
import torch
from transformers import BatchFeature
from lerobot.policies.wall_x.constant import (
CAMERA_NAME_MAPPING,
FREQUENCY_MAPPING,
KEY_MAPPINGS,
MULTIMODAL_DATASET_NAMES,
)
@dataclass
class X2RDataProcessingConfig:
"""Configuration class for X2R data processing pipeline.
This class contains all the necessary parameters for processing robotic data
including camera mappings, tactile sensor configurations, action predictions,
and various processing options.
"""
# Action prediction configuration
predict_action_keys: List[str] = field(default_factory=list)
obs_action_keys: List[str] = field(default_factory=list)
# Image resolution settings for different views
resolution: Dict[str, int] = field(
default_factory=lambda: {
"face_view": -1,
"left_wrist_view": 128,
"right_wrist_view": 128,
}
)
# Dataset splitting
train_test_split: float = 0.9
split_seed: int = 42
# Instruction handling
priority_order: Optional[Dict[str, float]] = None
# Vision model parameters
model_type: str = "qwen2_5"
max_pixels: int = 16384 * 28 * 28
min_pixels: int = 4 * 28 * 28
image_factor: int = 28
generate_subtask_ratio: float = 0.0
def __post_init__(self):
"""Post-initialization validation and setup."""
# Validate train/test split
if not 0 < self.train_test_split < 1:
raise ValueError(
f"train_test_split must be between 0 and 1, got {self.train_test_split}"
)
def as_dict(self) -> Dict:
"""Convert configuration to dictionary format.
Returns:
Dict: Configuration as dictionary
"""
return self.__dict__
def update(self, **kwargs) -> "X2RDataProcessingConfig":
"""Update configuration parameters.
Args:
**kwargs: Key-value pairs to update
Returns:
X2RDataProcessingConfig: Updated configuration instance
"""
for key, value in kwargs.items():
if hasattr(self, key):
setattr(self, key, value)
else:
raise ValueError(f"Unknown configuration parameter: {key}")
return self
def preprocesser_call(
processor,
images: Optional[Union[List, Any]] = None,
text: Optional[Union[str, List[str]]] = None,
videos: Optional[Union[List, Any]] = None,
padding: Union[bool, str] = False,
truncation: Optional[bool] = None,
max_length: Optional[int] = None,
return_tensors: str = "pt",
) -> BatchFeature:
"""Unified preprocessing function for Wall-X model handling text, image and video inputs.
Processes inputs into format suitable for multimodal transformer models, including:
- Text tokenization and special token handling
- Image/video processing through image processor
- Attention mask and label generation
- Padding and truncation handling
Args:
processor: Multimodal processor containing tokenizer and image processor
images: Input images (PIL, numpy arrays, or torch tensors)
text: Text or list of texts to tokenize
videos: Input videos (numpy arrays or torch tensors)
padding: Whether to pad sequences to same length
truncation: Whether to truncate sequences longer than max_length
max_length: Maximum length for truncation/padding
return_tensors: Format for returned tensors ('pt', 'np', etc.)
Returns:
BatchFeature containing processed inputs with keys:
- input_ids: Tokenized text
- attention_mask: Attention mask for text
- pixel_values: Processed image pixels
- pixel_values_videos: Processed video frames
- image_grid_thw: Image grid dimensions for LLM
- video_grid_thw: Video grid dimensions for LLM
- labels: Training labels with masking
"""
# Process image inputs
if images is not None and len(images) > 0:
image_inputs = processor.image_processor(
images=images, videos=None, return_tensors=return_tensors
)
image_grid_thw = image_inputs["image_grid_thw"]
else:
image_inputs = {}
image_grid_thw = None
# Process video inputs
if videos is not None:
videos_inputs = processor.image_processor(
images=None, videos=videos, return_tensors=return_tensors
)
video_grid_thw = videos_inputs["video_grid_thw"]
else:
videos_inputs = {}
video_grid_thw = None
# Ensure text input is in list format
if not isinstance(text, list):
text = [text]
# Process image placeholder tokens in text
if image_grid_thw is not None:
merge_length = processor.image_processor.merge_size**2
index = 0
for i in range(len(text)):
while "<|image_pad|>" in text[i]:
# Add bounds checking to avoid index overflow
if index >= len(image_grid_thw):
print(
f"Warning: Number of image placeholders ({index + 1}) "
f"exceeds actual images ({len(image_grid_thw)}), "
f"skipping remaining placeholder processing"
)
break
# Replace image placeholder with actual token count
token_count = image_grid_thw[index].prod() // merge_length
text[i] = text[i].replace(
"<|image_pad|>", "<|placeholder|>" * token_count, 1
)
index += 1
text[i] = text[i].replace("<|placeholder|>", "<|image_pad|>")
# Process video placeholder tokens in text
if video_grid_thw is not None:
merge_length = processor.image_processor.merge_size**2
index = 0
for i in range(len(text)):
while "<|video_pad|>" in text[i]:
# Replace video placeholder with actual token count
token_count = video_grid_thw[index].prod() // merge_length
text[i] = text[i].replace(
"<|video_pad|>", "<|placeholder|>" * token_count, 1
)
index += 1
text[i] = text[i].replace("<|placeholder|>", "<|video_pad|>")
# Tokenize complete input text
text_inputs = processor.tokenizer(
text,
return_tensors=return_tensors,
padding=padding,
truncation=truncation,
max_length=max_length,
)
# Get pad token ID for label generation
pad_token_id = processor.tokenizer.pad_token_id
if pad_token_id is None:
pad_token_id = processor.tokenizer.eos_token_id
# Generate labels for multi-turn dialogue, keeping only assistant response loss
labels = torch.full_like(text_inputs.input_ids, -100)
assistant_marker = "<|im_start|>assistant\n"
im_end_token_id = processor.tokenizer.convert_tokens_to_ids("<|im_end|>")
assistant_tokens = processor.tokenizer(
"<|im_start|>assistant\n", add_special_tokens=False
).input_ids
for i in range(len(text)):
assistant_regions = []
parts = text[i].split(assistant_marker)
# Process each part to determine which tokens belong to assistant responses
# Count left padding tokens
num_left_pads = 0
for token_id in text_inputs.input_ids[i]:
if token_id == pad_token_id:
num_left_pads += 1
else:
break
current_pos = num_left_pads
for j, part in enumerate(parts):
part_tokens = processor.tokenizer(part, add_special_tokens=False).input_ids
if j == 0:
# First part is system prompt or user question, all labels are -100
current_pos += len(part_tokens)
continue
# From second part onwards, each part starts with assistant response
for k in range(current_pos + 1, len(text_inputs.input_ids[i])):
if text_inputs.input_ids[i][k] == im_end_token_id:
assistant_regions.append(
(current_pos + len(assistant_tokens), k + 2)
)
break
current_pos += len(part_tokens) + 3
# Set labels for assistant response regions
for start, end in assistant_regions:
labels[i][start:end] = text_inputs.input_ids[i][start:end]
# Mask special action tokens in labels
action_token_id = processor.tokenizer.encode("<|action|>")[0]
propri_token_id = processor.tokenizer.encode("<|propri|>")[0]
labels[labels == action_token_id] = -100
labels[labels == propri_token_id] = -100
labels[labels == processor.tokenizer.pad_token_id] = -100
# Set labels to None if all are invalid to skip cross entropy loss
if (labels != -100).any().item():
text_inputs["labels"] = labels
else:
text_inputs["labels"] = None
return BatchFeature(data={**text_inputs, **image_inputs, **videos_inputs})
def process_grounding_points(
text: str,
orig_height: int,
orig_width: int,
resized_height: int,
resized_width: int,
model_type: str,
) -> str:
"""Process grounding point coordinates in text based on image resizing.
Adjusts coordinate values in <point> tags to match resized image dimensions
for different model types (qwen2, qwen2_5).
Args:
text: Input text containing <point> tags with coordinates
orig_height: Original image height
orig_width: Original image width
resized_height: Resized image height
resized_width: Resized image width
model_type: Model type for coordinate processing ('qwen2' or 'qwen2_5')
Returns:
Text with adjusted coordinate values
"""
# Regex pattern to match <point> tags and their contents
point_pattern = re.compile(r"<point>(.*?)</point>")
def process_match(match):
"""Process a single point match and adjust coordinates."""
coords_str = match.group(1)
try:
# Extract coordinates from string
coords = list(map(int, re.findall(r"\d+", coords_str)))
# Calculate resize scale factors
scale_w = resized_width / orig_width
scale_h = resized_height / orig_height
if len(coords) == 2:
x, y = coords
if model_type == "qwen2_5":
# Qwen2.5 uses pixel coordinates
new_x = max(0, min(round(x * scale_w), resized_width - 1))
new_y = max(0, min(round(y * scale_h), resized_height - 1))
elif model_type == "qwen2":
# Qwen2 normalizes to [0, 1000) range
new_x = max(0, min(999.999, (x / orig_width) * 1000))
new_y = max(0, min(999.999, (y / orig_height) * 1000))
else:
raise ValueError(f"Unsupported model type: {model_type}")
coords = [new_x, new_y]
elif len(coords) == 4:
x1, y1, x2, y2 = coords
if model_type == "qwen2_5":
new_x1 = max(0, min(round(x1 * scale_w), resized_width - 1))
new_y1 = max(0, min(round(y1 * scale_h), resized_height - 1))
new_x2 = max(0, min(round(x2 * scale_w), resized_width - 1))
new_y2 = max(0, min(round(y2 * scale_h), resized_height - 1))
elif model_type == "qwen2":
new_x1 = max(0, min(999.999, (x1 / orig_width) * 1000))
new_y1 = max(0, min(999.999, (y1 / orig_height) * 1000))
new_x2 = max(0, min(999.999, (x2 / orig_width) * 1000))
new_y2 = max(0, min(999.999, (y2 / orig_height) * 1000))
else:
raise ValueError(f"Unsupported model type: {model_type}")
coords = [new_x1, new_y1, new_x2, new_y2]
# Return processed point tag
return f'<point>[{", ".join(map(str, coords))}]</point>'
except (ValueError, TypeError):
# Return original content if processing fails
return match.group(0)
# Replace all matching point tags
processed_text = point_pattern.sub(process_match, text)
return processed_text
def get_frame_instruction(
instruction_info: Dict[str, Any],
frame_idx: Optional[int] = None,
truncate_keys: Optional[List[str]] = None,
) -> Tuple[Dict[str, Any], Optional[int]]:
"""Extract frame-specific instruction from instruction dictionary.
Args:
instruction_info: Dictionary containing instruction components
frame_idx: Current frame index
truncate_keys: Keys that trigger truncation when found
Returns:
Tuple of (frame_instruction_dict, split_end_frame)
"""
if truncate_keys is None:
truncate_keys = [
"subtask_generation",
"distribute",
"subtask_generation_zh",
"distribute_zh",
]
instruction_for_frame = {}
split_end = None
for key, value in instruction_info.items():
if isinstance(value, dict):
# Handle frame-range specific instructions
for frame_range, frame_instruction in value.items():
start_frame, end_frame = map(int, frame_range.split(" "))
if start_frame <= frame_idx < end_frame or (start_frame == frame_idx):
instruction_for_frame[key] = frame_instruction
if (
truncate_keys is not None
and split_end is None
and key in truncate_keys
):
split_end = end_frame + 1
break
else:
instruction_for_frame[key] = value
return instruction_for_frame, split_end
def get_task_instruction(
frame_instruction_info: Dict[str, Any], priority_order: Optional[OrderedDict] = None
) -> str:
"""Construct task instruction from available instruction fields using priority sampling.
Args:
frame_instruction_info: Dictionary containing instruction fields
priority_order: OrderedDict specifying sampling probability for each field
Returns:
Combined instruction string with priority components
"""
# Default priority settings
default_priority_order = OrderedDict(
{
"subtask_generation": 0.25,
"subtask_generation_zh": 0.25,
"distribute": 0.25,
"distribute_zh": 0.25,
}
)
if priority_order is not None:
priority_order = OrderedDict(priority_order)
else:
priority_order = default_priority_order
got_instruction = False
task_instruction = ""
# Sample instruction components based on priority probabilities
for key, prob in priority_order.items():
if key in frame_instruction_info and frame_instruction_info[key] != "":
if got_instruction:
if random.random() >= prob:
continue
task_instruction += f"\n{frame_instruction_info[key]}"
got_instruction = True
break
# Fall back to base instruction if no priority components found
if not got_instruction:
task_instruction = frame_instruction_info.get("instruction", "")
return task_instruction
def get_wallx_normal_text(
instruction_info: Dict[str, Any],
action_chunk_size: int,
frame_idx: int,
priority_order: Optional[OrderedDict] = None,
cam_mapping: Optional[Dict[str, str]] = None,
generate_subtask_ratio: float = 0.0,
) -> Tuple[str, bool]:
"""Construct complete multimodal prompt text for Wall-X model.
Formats input using special tokens including:
- System message
- User observations (with image placeholders)
- Task instructions
- Proprioception prompts
- Assistant responses (with action tokens)
Args:
instruction_info: Dictionary containing instruction components
action_chunk_size: Number of action tokens to generate
frame_idx: Current frame index
priority_order: Priority order for instruction sampling
cam_mapping: Camera name mapping dictionary
generate_subtask_ratio: Probability of generating subtask instead of actions
Returns:
Tuple of (formatted_prompt_text, is_subtask_generation)
"""
# Special tokens for formatting
role_start_symbol = "<|im_start|>"
role_end_symbol = "<|im_end|>"
vision_start_symbol = "<|vision_start|>"
vision_end_symbol = "<|vision_end|>"
image_pad_symbol = "<|image_pad|>"
propri_symbol = "<|propri|>"
action_symbol = "<|action|>"
action_fast_symbol = "<|action_fast|>"
# System prologue
prologue = (
f"{role_start_symbol}system\nYou are a helpful assistant.{role_end_symbol}\n"
)
# User request with observation
user_request = f"{role_start_symbol}user\nObservation:"
if cam_mapping:
for _, cam_name in cam_mapping.items():
view_name = CAMERA_NAME_MAPPING.get(cam_name, cam_name)
user_request += f" {view_name}: {vision_start_symbol}{image_pad_symbol}{vision_end_symbol}"
user_request += "\nInstruction:"
# Get frame-specific instruction
frame_instruction_info, _ = get_frame_instruction(
instruction_info, frame_idx=frame_idx
)
generate_subtask = False
priority_keys = ["subtask_generation", "distribute"]
# Decide whether to generate subtask or actions
if (
bool(set(frame_instruction_info.keys()) & set(priority_keys))
and random.random() < generate_subtask_ratio
):
# Generate subtask (equivalent to VQA task)
instruction = frame_instruction_info.get("instruction", "")
text_prompt = "\nPredict the next action in language.\n"
user_message = f"{user_request} {instruction}{text_prompt}{role_end_symbol}\n"
# Find output instruction from priority keys
for key in priority_keys:
if key in frame_instruction_info:
output_instruction = frame_instruction_info[key]
break
assistant_output = (
f"{role_start_symbol}assistant\n{output_instruction}\n{role_end_symbol}"
)
generate_subtask = True
else:
# Generate actions
instruction = get_task_instruction(
frame_instruction_info, priority_order=priority_order
)
text_prompt = f"\nPredict the next action in robot action.\nProprioception: {propri_symbol}\n"
user_message = f"{user_request} {instruction}{text_prompt}{role_end_symbol}\n"
assistant_output = f"{role_start_symbol}assistant\n{action_fast_symbol}{role_end_symbol}\n{action_symbol * action_chunk_size}"
complete_text = prologue + user_message + assistant_output
return complete_text, generate_subtask
def get_action_tokens(
normalized_actions: Union[torch.Tensor, List], action_tokenizer
) -> List[List[str]]:
"""Convert normalized actions to action token strings.
Args:
normalized_actions: Normalized action arrays/tensors
action_tokenizer: Tokenizer for converting actions to tokens
Returns:
List of action token string lists for each sample
"""
if isinstance(normalized_actions, torch.Tensor):
normalized_actions = normalized_actions.cpu().numpy()
all_action_tokens = []
for i in range(len(normalized_actions)):
if isinstance(normalized_actions[i], torch.Tensor):
normalized_actions[i] = normalized_actions[i].cpu().numpy()
token_id = action_tokenizer(normalized_actions[i])
action_tokens = [f"<|action_token_{j}|>" for j in token_id[0]]
all_action_tokens.append(action_tokens)
return all_action_tokens
def pad_action_token_strs(
actions_token_lists: List[List[str]], pad_token: str = "<|endoftext|>"
) -> List[str]:
"""Pad action token lists to same length and join as strings.
Args:
actions_token_lists: List of action token lists for each sample
pad_token: Token used for padding
Returns:
List of padded action token strings
"""
max_len = max(len(tokens) for tokens in actions_token_lists)
padded_action_strs = []
for tokens in actions_token_lists:
padded_tokens = (
tokens + ["<|im_end|>\n"] + [pad_token] * (max_len - len(tokens))
)
padded_action_strs.append("".join(padded_tokens))
return padded_action_strs
def replace_action_token(
text: List[str],
norm_action: Optional[torch.Tensor],
action_tokenizer,
dataset_names: List[str],
dof_masks: Optional[torch.Tensor] = None,
) -> List[str]:
"""Replace action placeholders in text with actual action tokens.
Args:
text: List of text strings with action placeholders
norm_action: Normalized action tensors
action_tokenizer: Tokenizer for converting actions to tokens
dataset_names: Names of datasets for each sample
dof_masks: Masks for degrees of freedom
Returns:
List of text strings with action tokens replaced
"""
# Filter out multimodal dataset names
dataset_names = [
name for name in dataset_names if name not in MULTIMODAL_DATASET_NAMES
]
# Get required action chunk sizes
required_chunk_sizes = [32 for name in dataset_names]
if action_tokenizer is not None and norm_action is not None:
# Extract actions based on chunk sizes and DOF masks
norm_action = [
action[: required_chunk_sizes[i], dof_masks[i, 0].bool()]
for i, action in enumerate(norm_action)
]
# Convert to action tokens and pad
actions_fast_tokens = get_action_tokens(norm_action, action_tokenizer)
actions_fast_token_strs = pad_action_token_strs(actions_fast_tokens)
# Replace action placeholders with actual tokens
actions_fast_token_idx = 0
for i in range(len(text)):
if "<|action_fast|>" in text[i]:
text[i] = text[i].replace(
"<|action_fast|><|im_end|>\n",
actions_fast_token_strs[actions_fast_token_idx],
)
actions_fast_token_idx += 1
# Remove remaining action placeholders
text = [t.replace("<|action|>", "") for t in text]
else:
# Remove action placeholders when no tokenizer available
text = [t.replace("<|action_fast|><|im_end|>\n", "") for t in text]
return text