Compare commits

...

9 Commits

Author SHA1 Message Date
Michel Aractingi b6050e6242 Merge pr-2593 (wallx support) into openarms_tmp_rebase 2025-12-17 16:52:11 +01:00
Pepijn 2ef1de78b6 Merge branch 'main' into main 2025-12-09 09:41:02 +01:00
Geoffrey19 2852b968b9 add wallx dependencies 2025-12-07 15:30:14 +08:00
Geoffrey19 56d20caa1e fixed dtype bugs 2025-12-07 15:30:13 +08:00
Geoffrey19 b4a7586b27 reduce to least config and params & pass lerobot basic test 2025-12-07 15:30:13 +08:00
Geoffrey19 78995621fa update the policy methods 2025-12-07 15:30:12 +08:00
Geoffrey19 5be8b6de6b incorporate wallx model into lerobot 2025-12-07 15:30:11 +08:00
Geoffrey19 b185fa0f87 fix bugs in flow 2025-12-07 15:30:10 +08:00
vincentchen 73a6f20e58 support wallx 2025-12-07 15:29:23 +08:00
14 changed files with 6580 additions and 2 deletions
+13
View File
@@ -122,6 +122,18 @@ intelrealsense = [
phone = ["hebi-py>=2.8.0,<2.12.0", "teleop>=0.1.0,<0.2.0", "fastapi<1.0"]
# Policies
wallx = [
"torch==2.6.0",
"torchvision==0.21.0",
"torchaudio==2.6.0",
"transformers==4.49.0",
"accelerate==1.10.1",
"peft==0.17.1",
"scipy==1.15.3",
"torchdiffeq==0.2.5",
"qwen_vl_utils==0.0.11",
"flash-attn==2.7.4.post1"
]
pi = ["transformers @ git+https://github.com/huggingface/transformers.git@fix/lerobot_openpi"]
smolvla = ["lerobot[transformers-dep]", "num2words>=0.5.14,<0.6.0", "accelerate>=1.7.0,<2.0.0", "safetensors>=0.4.3,<1.0.0"]
groot = [
@@ -162,6 +174,7 @@ all = [
"lerobot[reachy2]",
"lerobot[kinematics]",
"lerobot[intelrealsense]",
"lerobot[wallx]",
"lerobot[pi]",
"lerobot[smolvla]",
# "lerobot[groot]", TODO(Steven): Gr00t requires specific installation instructions for flash-attn
+2
View File
@@ -22,6 +22,7 @@ from .smolvla.processor_smolvla import SmolVLANewLineProcessor
from .tdmpc.configuration_tdmpc import TDMPCConfig as TDMPCConfig
from .vqbet.configuration_vqbet import VQBeTConfig as VQBeTConfig
from .xvla.configuration_xvla import XVLAConfig as XVLAConfig
from .wall_x.configuration_wall_x import WallXConfig as WallXConfig
__all__ = [
"ACTConfig",
@@ -33,4 +34,5 @@ __all__ = [
"VQBeTConfig",
"GrootConfig",
"XVLAConfig",
"WallXConfig",
]
+18 -2
View File
@@ -42,6 +42,7 @@ from lerobot.policies.tdmpc.configuration_tdmpc import TDMPCConfig
from lerobot.policies.utils import validate_visual_features_consistency
from lerobot.policies.vqbet.configuration_vqbet import VQBeTConfig
from lerobot.policies.xvla.configuration_xvla import XVLAConfig
from lerobot.policies.wall_x.configuration_wall_x import WallXConfig
from lerobot.processor import PolicyAction, PolicyProcessorPipeline
from lerobot.processor.converters import (
batch_to_transition,
@@ -61,7 +62,7 @@ def get_policy_class(name: str) -> type[PreTrainedPolicy]:
Args:
name: The name of the policy. Supported names are "tdmpc", "diffusion", "act",
"vqbet", "pi0", "pi05", "sac", "reward_classifier", "smolvla".
"vqbet", "pi0", "pi05", "sac", "reward_classifier", "smolvla", "wall_x".
Returns:
The policy class corresponding to the given name.
@@ -113,6 +114,10 @@ def get_policy_class(name: str) -> type[PreTrainedPolicy]:
from lerobot.policies.xvla.modeling_xvla import XVLAPolicy
return XVLAPolicy
elif name == "wall_x":
from lerobot.policies.wall_x.modeling_wall_x import WallXPolicy
return WallXPolicy
else:
try:
return _get_policy_cls_from_policy_name(name=name)
@@ -130,7 +135,7 @@ def make_policy_config(policy_type: str, **kwargs) -> PreTrainedConfig:
Args:
policy_type: The type of the policy. Supported types include "tdmpc",
"diffusion", "act", "vqbet", "pi0", "pi05", "sac", "smolvla",
"reward_classifier".
"reward_classifier", "wall_x".
**kwargs: Keyword arguments to be passed to the configuration class constructor.
Returns:
@@ -161,6 +166,8 @@ def make_policy_config(policy_type: str, **kwargs) -> PreTrainedConfig:
return GrootConfig(**kwargs)
elif policy_type == "xvla":
return XVLAConfig(**kwargs)
elif policy_type == "wall_x":
return WallXConfig(**kwargs)
else:
try:
config_cls = PreTrainedConfig.get_choice_class(policy_type)
@@ -344,6 +351,7 @@ def make_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(policy_cfg, XVLAConfig):
from lerobot.policies.xvla.processor_xvla import (
make_xvla_pre_post_processors,
@@ -353,6 +361,14 @@ def make_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(policy_cfg, WallXConfig):
from lerobot.policies.wall_x.processor_wall_x import make_wall_x_pre_post_processors
processors = make_wall_x_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
else:
try:
+35
View File
@@ -0,0 +1,35 @@
# WALL-OSS
This repository contains the Hugging Face port of **WALL-OSS**, a Vision-Language-Action model for cross-embodiment robotic control based on Qwen2.5-VL with flow matching/FAST action prediction.
---
## Model Overview
| Feature | Description |
| -------------------- | ------------------------------------------------------------------------ |
| Base Model | Qwen2.5-VL (Vision-Language Model) |
| Action Prediction | Flow Matching (diffusion) or FAST (discrete tokens) |
| Architecture | Mixture of Experts (MoE) with action-specific routing | |
| Multi-Modal Inputs | Vision (images/videos), Language, Proprioception |
---
## Citation
If you use this work, please cite:
```bibtex
@article{zhai2025igniting,
title = {Igniting VLMs Toward the Embodied Space},
author = {Zhai, Andy and Liu, Brae and Fang, Bruno and Cai, Chalse and Ma, Ellie and Yin, Ethan and Wang, Hao and Zhou, Hugo and Wang, James and Shi, Lights and Liang, Lucy and Wang, Make and Wang, Qian and Gan, Roy and Yu, Ryan and Li, Shalfun and Liu, Starrick and Chen, Sylas and Chen, Vincent and Xu, Zach},
journal = {arXiv preprint arXiv:2509.11766},
year = {2025}
}
```
---
## License
This port follows the **Apache 2.0 License**.
+21
View File
@@ -0,0 +1,21 @@
#!/usr/bin/env python
# Copyright 2025 Physical Intelligence and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .configuration_wall_x import WallXConfig
from .modeling_wall_x import WallXPolicy
from .processor_wall_x import make_wall_x_pre_post_processors
__all__ = ["WallXConfig", "WallXPolicy", "make_wall_x_pre_post_processors"]
@@ -0,0 +1,162 @@
# Copyright 2025 HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.configs.policies import PreTrainedConfig
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
from lerobot.optim.optimizers import AdamWConfig
from lerobot.optim.schedulers import CosineDecayWithWarmupSchedulerConfig
@PreTrainedConfig.register_subclass("wall_x")
@dataclass
class WallXConfig(PreTrainedConfig):
"""
Configuration class for Wall-X policy.
Wall-X is based on Qwen2.5-VL with action prediction capabilities using flow matching.
It supports cross-embodiment robotic control through unified action representations.
This config supports multi-modal learning with vision, language, and action data.
"""
# ==================== Input / Output Structure ====================
n_obs_steps: int = 1
chunk_size: int = 32 # action_horizon in wall-x
n_action_steps: int = 32
# Action dimension - wall-x uses 20
max_action_dim: int = 20
max_state_dim: int = 20 # For proprioception
normalization_mapping: dict[str, NormalizationMode] = field(
default_factory=lambda: {
"VISUAL": NormalizationMode.IDENTITY,
"STATE": NormalizationMode.MEAN_STD,
"ACTION": NormalizationMode.MEAN_STD,
}
)
# ==================== Action Prediction ====================
# Pretrained model paths
pretrained_name_or_path: str = "x-square-robot/wall-oss-flow"
# Action prediction mode: "diffusion" or "fast"
prediction_mode: str = "diffusion"
# Tokenizer settings
use_fast_tokenizer: bool = False # True: train FAST, False: train Flow
action_tokenizer_path: str | None = None # Path to action tokenizer (for FAST mode)
# ==================== Optimizer Presets ====================
optimizer_lr: float = 2e-5
optimizer_betas: tuple[float, float] = (0.9, 0.95)
optimizer_eps: float = 1e-8
optimizer_weight_decay: float = 0.01
optimizer_grad_clip_norm: float = 1.0
scheduler_warmup_steps: int = 1000
scheduler_decay_steps: int = 100000
scheduler_decay_lr: float = 1e-6
def __post_init__(self):
super().__post_init__()
# Input validation
if self.n_action_steps > self.chunk_size:
raise ValueError(
f"The chunk size is the upper bound for the number of action steps per model invocation. Got "
f"{self.n_action_steps} for `n_action_steps` and {self.chunk_size} for `chunk_size`."
)
if self.prediction_mode not in ["diffusion", "fast"]:
raise ValueError(
f"prediction_mode must be 'diffusion' or 'fast', got {self.prediction_mode}"
)
# Sync prediction_mode with use_fast_tokenizer
if self.use_fast_tokenizer:
self.prediction_mode = "fast"
else:
self.prediction_mode = "diffusion"
def validate_features(self) -> None:
"""Validate and set up input/output features."""
image_features = [key for key, feat in self.input_features.items() if feat.type == FeatureType.VISUAL]
if not image_features:
raise ValueError(
"Wall-X policy requires at least one visual input feature. "
"No features of type FeatureType.VISUAL found in input_features."
)
if "observation.state" not in self.input_features:
state_feature = PolicyFeature(
type=FeatureType.STATE,
shape=(self.max_state_dim,), # Padded to max_state_dim
)
self.input_features["observation.state"] = state_feature
else:
state_shape = self.input_features["observation.state"].shape
state_dim = state_shape[0] if state_shape else 0
if state_dim > self.max_state_dim:
raise ValueError(
f"State dimension {state_dim} exceeds max_state_dim {self.max_state_dim}. "
f"Either reduce state dimension or increase max_state_dim in config."
)
if "action" not in self.output_features:
action_feature = PolicyFeature(
type=FeatureType.ACTION,
shape=(self.max_action_dim,), # Padded to max_action_dim
)
self.output_features["action"] = action_feature
else:
action_shape = self.output_features["action"].shape
action_dim = action_shape[0] if action_shape else 0
if action_dim > self.max_action_dim:
raise ValueError(
f"Action dimension {action_dim} exceeds max_action_dim {self.max_action_dim}. "
f"Either reduce action dimension or increase max_action_dim in config."
)
def get_optimizer_preset(self) -> AdamWConfig:
return AdamWConfig(
lr=self.optimizer_lr,
betas=self.optimizer_betas,
eps=self.optimizer_eps,
weight_decay=self.optimizer_weight_decay,
grad_clip_norm=self.optimizer_grad_clip_norm,
)
def get_scheduler_preset(self):
return CosineDecayWithWarmupSchedulerConfig(
peak_lr=self.optimizer_lr,
decay_lr=self.scheduler_decay_lr,
num_warmup_steps=self.scheduler_warmup_steps,
num_decay_steps=self.scheduler_decay_steps,
)
@property
def observation_delta_indices(self) -> list:
return None
@property
def action_delta_indices(self) -> list:
return list(range(self.chunk_size))
@property
def reward_delta_indices(self) -> None:
return None
+43
View File
@@ -0,0 +1,43 @@
#!/usr/bin/env python
# Copyright 2025 HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Wall-X Constants and Configuration Data.
"""
from lerobot.utils.constants import OBS_STATE, OBS_IMAGES, ACTION
CAMERA_NAME_MAPPING = {
"face_view": "front view",
"left_wrist_view": "left wrist view",
"right_wrist_view": "right wrist view",
"move1_view": "move view",
"move2_view": "move view",
"wall_view": "wall view",
"top_view": "top view",
}
RESOLUTION = 256
# Parameters for preprocessing
MAX_PIXELS = 16384 * 28 * 28
MIN_PIXELS = 4 * 28 * 28
IMAGE_FACTOR = 28
PRIORITY_ORDER = None
GENERATE_SUBTASK_RATIO = 0.0
MODEL_TYPE = "qwen2_5"
TOKENIZER_MAX_LENGTH = 768
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,135 @@
#!/usr/bin/env python
# Copyright 2025 HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
import torch
from lerobot.configs.types import PipelineFeatureType, PolicyFeature
from lerobot.policies.wall_x.configuration_wall_x import WallXConfig
from lerobot.processor import (
AddBatchDimensionProcessorStep,
ComplementaryDataProcessorStep,
DeviceProcessorStep,
NormalizerProcessorStep,
PolicyAction,
PolicyProcessorPipeline,
ProcessorStepRegistry,
RenameObservationsProcessorStep,
UnnormalizerProcessorStep,
)
from lerobot.processor.converters import policy_action_to_transition, transition_to_policy_action
from lerobot.utils.constants import POLICY_POSTPROCESSOR_DEFAULT_NAME, POLICY_PREPROCESSOR_DEFAULT_NAME
def make_wall_x_pre_post_processors(
config: WallXConfig,
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
) -> tuple[
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
PolicyProcessorPipeline[PolicyAction, PolicyAction],
]:
"""
Constructs pre-processor and post-processor pipelines for the Wall-X policy.
The pre-processing pipeline prepares input data for the model by:
1. Renaming features to match pretrained configurations
2. Adding a batch dimension
4. Normalizing input and output features based on dataset statistics
5. Moving all data to the specified device
The post-processing pipeline handles the model's output by:
1. Unnormalizing the output actions to their original scale
2. Moving data to the CPU
Args:
config: The configuration object for the Wall-X policy
dataset_stats: A dictionary of statistics for normalization
Returns:
A tuple containing the configured pre-processor and post-processor pipelines
"""
input_steps = [
RenameObservationsProcessorStep(rename_map={}),
AddBatchDimensionProcessorStep(),
WallXTaskProcessor(), # Process task description
NormalizerProcessorStep(
features={**config.input_features, **config.output_features},
norm_map=config.normalization_mapping,
stats=dataset_stats,
),
DeviceProcessorStep(device=config.device),
]
output_steps = [
UnnormalizerProcessorStep(
features=config.output_features,
norm_map=config.normalization_mapping,
stats=dataset_stats
),
DeviceProcessorStep(device="cpu"),
]
return (
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]](
steps=input_steps,
name=POLICY_PREPROCESSOR_DEFAULT_NAME,
),
PolicyProcessorPipeline[PolicyAction, PolicyAction](
steps=output_steps,
name=POLICY_POSTPROCESSOR_DEFAULT_NAME,
to_transition=policy_action_to_transition,
to_output=transition_to_policy_action,
),
)
@ProcessorStepRegistry.register(name="wall_x_task_processor")
class WallXTaskProcessor(ComplementaryDataProcessorStep):
"""
A processor step that ensures the task description is properly formatted for Wall-X.
This step handles task preprocessing similar to Qwen-VL requirements.
"""
def complementary_data(self, complementary_data):
if "task" not in complementary_data:
return complementary_data
task = complementary_data["task"]
if task is None:
# Provide default task if none specified
complementary_data["task"] = "Execute the robot action."
return complementary_data
new_complementary_data = dict(complementary_data)
# Handle both string and list of strings
if isinstance(task, str):
# Single string: ensure proper formatting
if not task.endswith("."):
new_complementary_data["task"] = f"{task}."
elif isinstance(task, list) and all(isinstance(t, str) for t in task):
# List of strings: format each
new_complementary_data["task"] = [
t if t.endswith(".") else f"{t}." for t in task
]
return new_complementary_data
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
return features
@@ -0,0 +1,248 @@
from transformers.configuration_utils import PretrainedConfig
from transformers.modeling_rope_utils import rope_config_validation
class Qwen2_5_VLVisionConfig(PretrainedConfig):
model_type = "qwen2_5_vl"
base_config_key = "vision_config"
def __init__(
self,
depth=32,
hidden_size=3584,
hidden_act="silu",
intermediate_size=3420,
num_heads=16,
in_channels=3,
patch_size=14,
spatial_merge_size=2,
temporal_patch_size=2,
tokens_per_second=4,
window_size=112,
out_hidden_size=3584,
fullatt_block_indexes=[7, 15, 23, 31],
**kwargs,
):
super().__init__(**kwargs)
self.depth = depth
self.hidden_size = hidden_size
self.hidden_act = hidden_act
self.intermediate_size = intermediate_size
self.num_heads = num_heads
self.in_channels = in_channels
self.patch_size = patch_size
self.spatial_merge_size = spatial_merge_size
self.temporal_patch_size = temporal_patch_size
self.tokens_per_second = tokens_per_second
self.window_size = window_size
self.fullatt_block_indexes = fullatt_block_indexes
self.out_hidden_size = out_hidden_size
class Qwen2_5_VLConfig(PretrainedConfig):
r"""
This is the configuration class to store the configuration of a [`Qwen2_5_VLModel`]. It is used to instantiate a
Qwen2-VL model according to the specified arguments, defining the model architecture. Instantiating a configuration
with the defaults will yield a similar configuration to that of
Qwen2-VL-7B-Instruct [Qwen/Qwen2-VL-7B-Instruct](https://huggingface.co/Qwen/Qwen2-VL-7B-Instruct).
Configuration objects inherit from [`PretrainedConfig`] and can be used to control the model outputs. Read the
documentation from [`PretrainedConfig`] for more information.
Args:
vocab_size (`int`, *optional*, defaults to 152064):
Vocabulary size of the Qwen2_5_VL model. Defines the number of different tokens that can be represented by the
`inputs_ids` passed when calling [`Qwen2_5_VLModel`]
hidden_size (`int`, *optional*, defaults to 8192):
Dimension of the hidden representations.
intermediate_size (`int`, *optional*, defaults to 29568):
Dimension of the MLP representations.
num_hidden_layers (`int`, *optional*, defaults to 80):
Number of hidden layers in the Transformer encoder.
num_attention_heads (`int`, *optional*, defaults to 64):
Number of attention heads for each attention layer in the Transformer encoder.
num_key_value_heads (`int`, *optional*, defaults to 8):
This is the number of key_value heads that should be used to implement Grouped Query Attention. If
`num_key_value_heads=num_attention_heads`, the model will use Multi Head Attention (MHA), if
`num_key_value_heads=1` the model will use Multi Query Attention (MQA) otherwise GQA is used. When
converting a multi-head checkpoint to a GQA checkpoint, each group key and value head should be constructed
by meanpooling all the original heads within that group. For more details checkout [this
paper](https://arxiv.org/pdf/2305.13245.pdf). If it is not specified, will default to `32`.
hidden_act (`str` or `function`, *optional*, defaults to `"silu"`):
The non-linear activation function (function or string) in the decoder.
max_position_embeddings (`int`, *optional*, defaults to 32768):
The maximum sequence length that this model might ever be used with.
initializer_range (`float`, *optional*, defaults to 0.02):
The standard deviation of the truncated_normal_initializer for initializing all weight matrices.
rms_norm_eps (`float`, *optional*, defaults to 1e-05):
The epsilon used by the rms normalization layers.
use_cache (`bool`, *optional*, defaults to `True`):
Whether or not the model should return the last key/values attentions (not used by all models). Only
relevant if `config.is_decoder=True`.
tie_word_embeddings (`bool`, *optional*, defaults to `False`):
Whether the model's input and output word embeddings should be tied.
rope_theta (`float`, *optional*, defaults to 1000000.0):
The base period of the RoPE embeddings.
use_sliding_window (`bool`, *optional*, defaults to `False`):
Whether to use sliding window attention.
sliding_window (`int`, *optional*, defaults to 4096):
Sliding window attention (SWA) window size. If not specified, will default to `4096`.
max_window_layers (`int`, *optional*, defaults to 80):
The number of layers that use SWA (Sliding Window Attention). The bottom layers use SWA while the top use full attention.
attention_dropout (`float`, *optional*, defaults to 0.0):
The dropout ratio for the attention probabilities.
vision_config (`Dict`, *optional*):
The config for the visual encoder initialization.
rope_scaling (`Dict`, *optional*):
Dictionary containing the scaling configuration for the RoPE embeddings. NOTE: if you apply new rope type
and you expect the model to work on longer `max_position_embeddings`, we recommend you to update this value
accordingly.
Expected contents:
`rope_type` (`str`):
The sub-variant of RoPE to use. Can be one of ['default', 'linear', 'dynamic', 'yarn', 'longrope',
'llama3'], with 'default' being the original RoPE implementation.
`factor` (`float`, *optional*):
Used with all rope types except 'default'. The scaling factor to apply to the RoPE embeddings. In
most scaling types, a `factor` of x will enable the model to handle sequences of length x *
original maximum pre-trained length.
`original_max_position_embeddings` (`int`, *optional*):
Used with 'dynamic', 'longrope' and 'llama3'. The original max position embeddings used during
pretraining.
`attention_factor` (`float`, *optional*):
Used with 'yarn' and 'longrope'. The scaling factor to be applied on the attention
computation. If unspecified, it defaults to value recommended by the implementation, using the
`factor` field to infer the suggested value.
`beta_fast` (`float`, *optional*):
Only used with 'yarn'. Parameter to set the boundary for extrapolation (only) in the linear
ramp function. If unspecified, it defaults to 32.
`beta_slow` (`float`, *optional*):
Only used with 'yarn'. Parameter to set the boundary for interpolation (only) in the linear
ramp function. If unspecified, it defaults to 1.
`short_factor` (`List[float]`, *optional*):
Only used with 'longrope'. The scaling factor to be applied to short contexts (<
`original_max_position_embeddings`). Must be a list of numbers with the same length as the hidden
size divided by the number of attention heads divided by 2
`long_factor` (`List[float]`, *optional*):
Only used with 'longrope'. The scaling factor to be applied to long contexts (<
`original_max_position_embeddings`). Must be a list of numbers with the same length as the hidden
size divided by the number of attention heads divided by 2
`low_freq_factor` (`float`, *optional*):
Only used with 'llama3'. Scaling factor applied to low frequency components of the RoPE
`high_freq_factor` (`float`, *optional*):
Only used with 'llama3'. Scaling factor applied to high frequency components of the RoPE
```python
>>> from transformers import Qwen2_5_VLForConditionalGeneration, Qwen2_5_VLConfig
>>> # Initializing a Qwen2_5_VL style configuration
>>> configuration = Qwen2_5_VLConfig()
>>> # Initializing a model from the Qwen2-VL-7B style configuration
>>> model = Qwen2_5_VLForConditionalGeneration(configuration)
>>> # Accessing the model configuration
>>> configuration = model.config
```"""
model_type = "qwen2_5_vl"
sub_configs = {"vision_config": Qwen2_5_VLVisionConfig}
keys_to_ignore_at_inference = ["past_key_values"]
# Default tensor parallel plan for base model `Qwen2_5_VL`
base_model_tp_plan = {
"layers.*.self_attn.q_proj": "colwise",
"layers.*.self_attn.k_proj": "colwise",
"layers.*.self_attn.v_proj": "colwise",
"layers.*.self_attn.o_proj": "rowwise",
"layers.*.mlp.gate_proj": "colwise",
"layers.*.mlp.up_proj": "colwise",
"layers.*.mlp.down_proj": "rowwise",
}
base_model_pp_plan = {
"embed_tokens": (["input_ids"], ["inputs_embeds"]),
"layers": (["hidden_states", "attention_mask"], ["hidden_states"]),
"norm": (["hidden_states"], ["hidden_states"]),
}
def __init__(
self,
vocab_size=152064,
hidden_size=8192,
intermediate_size=29568,
num_hidden_layers=80,
num_attention_heads=64,
num_key_value_heads=8,
hidden_act="silu",
max_position_embeddings=32768,
initializer_range=0.02,
rms_norm_eps=1e-05,
use_cache=True,
tie_word_embeddings=False,
rope_theta=1000000.0,
use_sliding_window=False,
sliding_window=4096,
max_window_layers=80,
attention_dropout=0.0,
vision_config=None,
rope_scaling=None,
num_experts=4,
experts=None,
dof_config=None,
noise_scheduler=None,
dim_inputs=(1536, 1536),
attention_moe=False,
mlp_moe=False,
**kwargs,
):
if isinstance(vision_config, dict):
self.vision_config = self.sub_configs["vision_config"](**vision_config)
elif vision_config is None:
self.vision_config = self.sub_configs["vision_config"]()
self.vocab_size = vocab_size
self.max_position_embeddings = max_position_embeddings
self.hidden_size = hidden_size
self.intermediate_size = intermediate_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.use_sliding_window = use_sliding_window
self.sliding_window = sliding_window
self.max_window_layers = max_window_layers
# for backward compatibility
if num_key_value_heads is None:
num_key_value_heads = num_attention_heads
self.num_key_value_heads = num_key_value_heads
self.hidden_act = hidden_act
self.initializer_range = initializer_range
self.rms_norm_eps = rms_norm_eps
self.use_cache = use_cache
self.rope_theta = rope_theta
self.attention_dropout = attention_dropout
self.rope_scaling = rope_scaling
self.num_experts = num_experts
self.experts = experts
self.dof_config = dof_config
self.noise_scheduler = noise_scheduler
self.dim_inputs = tuple(dim_inputs)
self.attention_moe = attention_moe
self.mlp_moe = mlp_moe
# Validate the correctness of rotary position embeddings parameters
# BC: if there is a 'type' field, move it to 'rope_type'.
# and change type from 'mrope' to 'default' because `mrope` does defeault RoPE calculations
# one can set it to "linear"/"dynamic" etc. to have scaled RoPE
# TODO: @raushan update config in the hub
if self.rope_scaling is not None and "type" in self.rope_scaling:
if self.rope_scaling["type"] == "mrope":
self.rope_scaling["type"] = "default"
self.rope_scaling["rope_type"] = self.rope_scaling["type"]
rope_config_validation(self, ignore_keys={"mrope_section"})
super().__init__(tie_word_embeddings=tie_word_embeddings, **kwargs)
__all__ = ["Qwen2_5_VLConfig"]
File diff suppressed because it is too large Load Diff
+664
View File
@@ -0,0 +1,664 @@
#!/usr/bin/env python
# Copyright 2025 HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Wall-X Utility Functions.
Contains data processing utilities, text formatting functions, and helper classes
for the Wall-X cross-embodiment robotic control model.
"""
import json
import random
import re
from collections import OrderedDict
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple, Union
import torch
from transformers import BatchFeature
from lerobot.policies.wall_x.constant import (
CAMERA_NAME_MAPPING,
)
from lerobot.utils.constants import OBS_IMAGES
@dataclass
class X2RDataProcessingConfig:
"""Configuration class for X2R data processing pipeline.
This class contains all the necessary parameters for processing robotic data
including camera mappings, tactile sensor configurations, action predictions,
and various processing options.
"""
# Action prediction configuration
predict_action_keys: List[str] = field(default_factory=list)
obs_action_keys: List[str] = field(default_factory=list)
# Image resolution settings for different views
resolution: Dict[str, int] = field(
default_factory=lambda: {
"face_view": -1,
"left_wrist_view": 128,
"right_wrist_view": 128,
}
)
# Dataset splitting
train_test_split: float = 0.9
split_seed: int = 42
# Instruction handling
priority_order: Optional[Dict[str, float]] = None
# Vision model parameters
model_type: str = "qwen2_5"
max_pixels: int = 16384 * 28 * 28
min_pixels: int = 4 * 28 * 28
image_factor: int = 28
generate_subtask_ratio: float = 0.0
def __post_init__(self):
"""Post-initialization validation and setup."""
# Validate train/test split
if not 0 < self.train_test_split < 1:
raise ValueError(
f"train_test_split must be between 0 and 1, got {self.train_test_split}"
)
def as_dict(self) -> Dict:
"""Convert configuration to dictionary format.
Returns:
Dict: Configuration as dictionary
"""
return self.__dict__
def update(self, **kwargs) -> "X2RDataProcessingConfig":
"""Update configuration parameters.
Args:
**kwargs: Key-value pairs to update
Returns:
X2RDataProcessingConfig: Updated configuration instance
"""
for key, value in kwargs.items():
if hasattr(self, key):
setattr(self, key, value)
else:
raise ValueError(f"Unknown configuration parameter: {key}")
return self
def preprocesser_call(
processor,
images: Optional[Union[List, Any]] = None,
text: Optional[Union[str, List[str]]] = None,
videos: Optional[Union[List, Any]] = None,
padding: Union[bool, str] = False,
truncation: Optional[bool] = None,
max_length: Optional[int] = None,
return_tensors: str = "pt",
) -> BatchFeature:
"""Unified preprocessing function for Wall-X model handling text, image and video inputs.
Processes inputs into format suitable for multimodal transformer models, including:
- Text tokenization and special token handling
- Image/video processing through image processor
- Attention mask and label generation
- Padding and truncation handling
Args:
processor: Multimodal processor containing tokenizer and image processor
images: Input images (PIL, numpy arrays, or torch tensors)
text: Text or list of texts to tokenize
videos: Input videos (numpy arrays or torch tensors)
padding: Whether to pad sequences to same length
truncation: Whether to truncate sequences longer than max_length
max_length: Maximum length for truncation/padding
return_tensors: Format for returned tensors ('pt', 'np', etc.)
Returns:
BatchFeature containing processed inputs with keys:
- input_ids: Tokenized text
- attention_mask: Attention mask for text
- pixel_values: Processed image pixels
- pixel_values_videos: Processed video frames
- image_grid_thw: Image grid dimensions for LLM
- video_grid_thw: Video grid dimensions for LLM
- labels: Training labels with masking
"""
# Process image inputs
if images is not None and len(images) > 0:
image_inputs = processor.image_processor(
images=images, videos=None, return_tensors=return_tensors
)
image_grid_thw = image_inputs["image_grid_thw"]
else:
image_inputs = {}
image_grid_thw = None
# Process video inputs
if videos is not None:
videos_inputs = processor.image_processor(
images=None, videos=videos, return_tensors=return_tensors
)
video_grid_thw = videos_inputs["video_grid_thw"]
else:
videos_inputs = {}
video_grid_thw = None
# Ensure text input is in list format
if not isinstance(text, list):
text = [text]
# Process image placeholder tokens in text
if image_grid_thw is not None:
merge_length = processor.image_processor.merge_size**2
index = 0
for i in range(len(text)):
while "<|image_pad|>" in text[i]:
# Add bounds checking to avoid index overflow
if index >= len(image_grid_thw):
print(
f"Warning: Number of image placeholders ({index + 1}) "
f"exceeds actual images ({len(image_grid_thw)}), "
f"skipping remaining placeholder processing"
)
break
# Replace image placeholder with actual token count
token_count = image_grid_thw[index].prod() // merge_length
text[i] = text[i].replace(
"<|image_pad|>", "<|placeholder|>" * token_count, 1
)
index += 1
text[i] = text[i].replace("<|placeholder|>", "<|image_pad|>")
# Process video placeholder tokens in text
if video_grid_thw is not None:
merge_length = processor.image_processor.merge_size**2
index = 0
for i in range(len(text)):
while "<|video_pad|>" in text[i]:
# Replace video placeholder with actual token count
token_count = video_grid_thw[index].prod() // merge_length
text[i] = text[i].replace(
"<|video_pad|>", "<|placeholder|>" * token_count, 1
)
index += 1
text[i] = text[i].replace("<|placeholder|>", "<|video_pad|>")
# Tokenize complete input text
text_inputs = processor.tokenizer(
text,
return_tensors=return_tensors,
padding=padding,
truncation=truncation,
max_length=max_length,
)
# Get pad token ID for label generation
pad_token_id = processor.tokenizer.pad_token_id
if pad_token_id is None:
pad_token_id = processor.tokenizer.eos_token_id
# Generate labels for multi-turn dialogue, keeping only assistant response loss
labels = torch.full_like(text_inputs.input_ids, -100)
assistant_marker = "<|im_start|>assistant\n"
im_end_token_id = processor.tokenizer.convert_tokens_to_ids("<|im_end|>")
assistant_tokens = processor.tokenizer(
"<|im_start|>assistant\n", add_special_tokens=False
).input_ids
for i in range(len(text)):
assistant_regions = []
parts = text[i].split(assistant_marker)
# Process each part to determine which tokens belong to assistant responses
# Count left padding tokens
num_left_pads = 0
for token_id in text_inputs.input_ids[i]:
if token_id == pad_token_id:
num_left_pads += 1
else:
break
current_pos = num_left_pads
for j, part in enumerate(parts):
part_tokens = processor.tokenizer(part, add_special_tokens=False).input_ids
if j == 0:
# First part is system prompt or user question, all labels are -100
current_pos += len(part_tokens)
continue
# From second part onwards, each part starts with assistant response
for k in range(current_pos + 1, len(text_inputs.input_ids[i])):
if text_inputs.input_ids[i][k] == im_end_token_id:
assistant_regions.append(
(current_pos + len(assistant_tokens), k + 2)
)
break
current_pos += len(part_tokens) + 3
# Set labels for assistant response regions
for start, end in assistant_regions:
labels[i][start:end] = text_inputs.input_ids[i][start:end]
# Mask special action tokens in labels
action_token_id = processor.tokenizer.encode("<|action|>")[0]
propri_token_id = processor.tokenizer.encode("<|propri|>")[0]
labels[labels == action_token_id] = -100
labels[labels == propri_token_id] = -100
labels[labels == processor.tokenizer.pad_token_id] = -100
# Set labels to None if all are invalid to skip cross entropy loss
if (labels != -100).any().item():
text_inputs["labels"] = labels
else:
text_inputs["labels"] = None
return BatchFeature(data={**text_inputs, **image_inputs, **videos_inputs})
def process_grounding_points(
text: str,
orig_height: int,
orig_width: int,
resized_height: int,
resized_width: int,
model_type: str,
) -> str:
"""Process grounding point coordinates in text based on image resizing.
Adjusts coordinate values in <point> tags to match resized image dimensions
for different model types (qwen2, qwen2_5).
Args:
text: Input text containing <point> tags with coordinates
orig_height: Original image height
orig_width: Original image width
resized_height: Resized image height
resized_width: Resized image width
model_type: Model type for coordinate processing ('qwen2' or 'qwen2_5')
Returns:
Text with adjusted coordinate values
"""
# Regex pattern to match <point> tags and their contents
point_pattern = re.compile(r"<point>(.*?)</point>")
def process_match(match):
"""Process a single point match and adjust coordinates."""
coords_str = match.group(1)
try:
# Extract coordinates from string
coords = list(map(int, re.findall(r"\d+", coords_str)))
# Calculate resize scale factors
scale_w = resized_width / orig_width
scale_h = resized_height / orig_height
if len(coords) == 2:
x, y = coords
if model_type == "qwen2_5":
# Qwen2.5 uses pixel coordinates
new_x = max(0, min(round(x * scale_w), resized_width - 1))
new_y = max(0, min(round(y * scale_h), resized_height - 1))
elif model_type == "qwen2":
# Qwen2 normalizes to [0, 1000) range
new_x = max(0, min(999.999, (x / orig_width) * 1000))
new_y = max(0, min(999.999, (y / orig_height) * 1000))
else:
raise ValueError(f"Unsupported model type: {model_type}")
coords = [new_x, new_y]
elif len(coords) == 4:
x1, y1, x2, y2 = coords
if model_type == "qwen2_5":
new_x1 = max(0, min(round(x1 * scale_w), resized_width - 1))
new_y1 = max(0, min(round(y1 * scale_h), resized_height - 1))
new_x2 = max(0, min(round(x2 * scale_w), resized_width - 1))
new_y2 = max(0, min(round(y2 * scale_h), resized_height - 1))
elif model_type == "qwen2":
new_x1 = max(0, min(999.999, (x1 / orig_width) * 1000))
new_y1 = max(0, min(999.999, (y1 / orig_height) * 1000))
new_x2 = max(0, min(999.999, (x2 / orig_width) * 1000))
new_y2 = max(0, min(999.999, (y2 / orig_height) * 1000))
else:
raise ValueError(f"Unsupported model type: {model_type}")
coords = [new_x1, new_y1, new_x2, new_y2]
# Return processed point tag
return f'<point>[{", ".join(map(str, coords))}]</point>'
except (ValueError, TypeError):
# Return original content if processing fails
return match.group(0)
# Replace all matching point tags
processed_text = point_pattern.sub(process_match, text)
return processed_text
def get_frame_instruction(
instruction_info: Dict[str, Any],
frame_idx: Optional[int] = None,
truncate_keys: Optional[List[str]] = None,
) -> Tuple[Dict[str, Any], Optional[int]]:
"""Extract frame-specific instruction from instruction dictionary.
Args:
instruction_info: Dictionary containing instruction components
frame_idx: Current frame index
truncate_keys: Keys that trigger truncation when found
Returns:
Tuple of (frame_instruction_dict, split_end_frame)
"""
if truncate_keys is None:
truncate_keys = [
"subtask_generation",
"distribute",
"subtask_generation_zh",
"distribute_zh",
]
instruction_for_frame = {}
split_end = None
for key, value in instruction_info.items():
if isinstance(value, dict):
# Handle frame-range specific instructions
for frame_range, frame_instruction in value.items():
start_frame, end_frame = map(int, frame_range.split(" "))
if start_frame <= frame_idx < end_frame or (start_frame == frame_idx):
instruction_for_frame[key] = frame_instruction
if (
truncate_keys is not None
and split_end is None
and key in truncate_keys
):
split_end = end_frame + 1
break
else:
instruction_for_frame[key] = value
return instruction_for_frame, split_end
def get_task_instruction(
frame_instruction_info: Dict[str, Any], priority_order: Optional[OrderedDict] = None
) -> str:
"""Construct task instruction from available instruction fields using priority sampling.
Args:
frame_instruction_info: Dictionary containing instruction fields
priority_order: OrderedDict specifying sampling probability for each field
Returns:
Combined instruction string with priority components
"""
# Default priority settings
default_priority_order = OrderedDict(
{
"subtask_generation": 0.25,
"subtask_generation_zh": 0.25,
"distribute": 0.25,
"distribute_zh": 0.25,
}
)
if priority_order is not None:
priority_order = OrderedDict(priority_order)
else:
priority_order = default_priority_order
got_instruction = False
task_instruction = ""
# Sample instruction components based on priority probabilities
for key, prob in priority_order.items():
if key in frame_instruction_info and frame_instruction_info[key] != "":
if got_instruction:
if random.random() >= prob:
continue
task_instruction += f"\n{frame_instruction_info[key]}"
got_instruction = True
break
# Fall back to base instruction if no priority components found
if not got_instruction:
task_instruction = frame_instruction_info.get("instruction", "")
return task_instruction
def get_wallx_normal_text(
instruction_info: Dict[str, Any],
action_chunk_size: int,
frame_idx: int,
priority_order: Optional[OrderedDict] = None,
img_keys: Optional[List[str]] = None,
generate_subtask_ratio: float = 0.0,
) -> Tuple[str, bool]:
"""Construct complete multimodal prompt text for Wall-X model.
Formats input using special tokens including:
- System message
- User observations (with image placeholders)
- Task instructions
- Proprioception prompts
- Assistant responses (with action tokens)
Args:
instruction_info: Dictionary containing instruction components
action_chunk_size: Number of action tokens to generate
frame_idx: Current frame index
priority_order: Priority order for instruction sampling
img_keys: List of image keys
generate_subtask_ratio: Probability of generating subtask instead of actions
Returns:
Tuple of (formatted_prompt_text, is_subtask_generation)
"""
# Special tokens for formatting
role_start_symbol = "<|im_start|>"
role_end_symbol = "<|im_end|>"
vision_start_symbol = "<|vision_start|>"
vision_end_symbol = "<|vision_end|>"
image_pad_symbol = "<|image_pad|>"
propri_symbol = "<|propri|>"
action_symbol = "<|action|>"
action_fast_symbol = "<|action_fast|>"
# System prologue
prologue = (
f"{role_start_symbol}system\nYou are a helpful assistant.{role_end_symbol}\n"
)
# User request with observation
user_request = f"{role_start_symbol}user\nObservation:"
if img_keys:
img_keys = img_key_mapping(img_keys)
for key in img_keys:
user_request += f" {key}: {vision_start_symbol}{image_pad_symbol}{vision_end_symbol}"
user_request += "\nInstruction:"
# Get frame-specific instruction
frame_instruction_info, _ = get_frame_instruction(
instruction_info, frame_idx=frame_idx
)
generate_subtask = False
priority_keys = ["subtask_generation", "distribute"]
# Decide whether to generate subtask or actions
if (
bool(set(frame_instruction_info.keys()) & set(priority_keys))
and random.random() < generate_subtask_ratio
):
# Generate subtask (equivalent to VQA task)
instruction = frame_instruction_info.get("instruction", "")
text_prompt = "\nPredict the next action in language.\n"
user_message = f"{user_request} {instruction}{text_prompt}{role_end_symbol}\n"
# Find output instruction from priority keys
for key in priority_keys:
if key in frame_instruction_info:
output_instruction = frame_instruction_info[key]
break
assistant_output = (
f"{role_start_symbol}assistant\n{output_instruction}\n{role_end_symbol}"
)
generate_subtask = True
else:
# Generate actions
instruction = get_task_instruction(
frame_instruction_info, priority_order=priority_order
)
text_prompt = f"\nPredict the next action in robot action.\nProprioception: {propri_symbol}\n"
user_message = f"{user_request} {instruction}{text_prompt}{role_end_symbol}\n"
assistant_output = f"{role_start_symbol}assistant\n{action_fast_symbol}{role_end_symbol}\n{action_symbol * action_chunk_size}"
complete_text = prologue + user_message + assistant_output
return complete_text, generate_subtask
def img_key_mapping(img_keys: List[str]) -> List[str]:
"""Map image keys to camera names.
Args:
img_keys: List of image keys
Returns:
List of camera names
"""
processed_img_keys = []
for key in img_keys:
key = key.replace(OBS_IMAGES + ".", "")
if key in CAMERA_NAME_MAPPING:
key = CAMERA_NAME_MAPPING[key]
else:
if 'view' in key:
key = key.replace('_', ' ')
else:
key = key + " view"
processed_img_keys.append(key)
return processed_img_keys
def get_action_tokens(
normalized_actions: Union[torch.Tensor, List], action_tokenizer
) -> List[List[str]]:
"""Convert normalized actions to action token strings.
Args:
normalized_actions: Normalized action arrays/tensors
action_tokenizer: Tokenizer for converting actions to tokens
Returns:
List of action token string lists for each sample
"""
if isinstance(normalized_actions, torch.Tensor):
normalized_actions = normalized_actions.cpu().numpy()
all_action_tokens = []
for i in range(len(normalized_actions)):
if isinstance(normalized_actions[i], torch.Tensor):
normalized_actions[i] = normalized_actions[i].cpu().numpy()
token_id = action_tokenizer(normalized_actions[i])
action_tokens = [f"<|action_token_{j}|>" for j in token_id[0]]
all_action_tokens.append(action_tokens)
return all_action_tokens
def pad_action_token_strs(
actions_token_lists: List[List[str]], pad_token: str = "<|endoftext|>"
) -> List[str]:
"""Pad action token lists to same length and join as strings.
Args:
actions_token_lists: List of action token lists for each sample
pad_token: Token used for padding
Returns:
List of padded action token strings
"""
max_len = max(len(tokens) for tokens in actions_token_lists)
padded_action_strs = []
for tokens in actions_token_lists:
padded_tokens = (
tokens + ["<|im_end|>\n"] + [pad_token] * (max_len - len(tokens))
)
padded_action_strs.append("".join(padded_tokens))
return padded_action_strs
def replace_action_token(
text: List[str],
norm_action: Optional[torch.Tensor],
action_tokenizer,
dof_masks: Optional[torch.Tensor] = None,
) -> List[str]:
"""Replace action placeholders in text with actual action tokens.
Args:
text: List of text strings with action placeholders
norm_action: Normalized action tensors
action_tokenizer: Tokenizer for converting actions to tokens
dataset_names: Names of datasets for each sample
dof_masks: Masks for degrees of freedom
Returns:
List of text strings with action tokens replaced
"""
# Filter out multimodal dataset names
if action_tokenizer is not None and norm_action is not None:
# Extract actions based on chunk sizes and DOF masks
norm_action = [
action[: 32, dof_masks[i, 0].bool()]
for i, action in enumerate(norm_action)
]
# Convert to action tokens and pad
actions_fast_tokens = get_action_tokens(norm_action, action_tokenizer)
actions_fast_token_strs = pad_action_token_strs(actions_fast_tokens)
# Replace action placeholders with actual tokens
actions_fast_token_idx = 0
for i in range(len(text)):
if "<|action_fast|>" in text[i]:
text[i] = text[i].replace(
"<|action_fast|><|im_end|>\n",
actions_fast_token_strs[actions_fast_token_idx],
)
actions_fast_token_idx += 1
# Remove remaining action placeholders
text = [t.replace("<|action|>", "") for t in text]
else:
# Remove action placeholders when no tokenizer available
text = [t.replace("<|action_fast|><|im_end|>\n", "") for t in text]
return text
+2
View File
@@ -0,0 +1,2 @@
# Wall-X policy tests
+126
View File
@@ -0,0 +1,126 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test script to verify Wall-X policy integration with LeRobot, only meant to be run locally!"""
import os
import pytest
import torch
# Skip this entire module in CI
pytestmark = pytest.mark.skipif(
os.environ.get("CI") == "true" or os.environ.get("GITHUB_ACTIONS") == "true",
reason="This test requires local Wall-X installation and is not meant for CI",
)
from lerobot.policies.factory import make_policy_config # noqa: E402
from lerobot.policies.wall_x import ( # noqa: E402
WallXConfig,
WallXPolicy,
make_wall_x_pre_post_processors, # noqa: E402
)
from lerobot.utils.random_utils import set_seed # noqa: E402
def test_policy_instantiation():
# Create config
set_seed(42)
config = WallXConfig(device='cuda')
# Set up input_features and output_features in the config
from lerobot.configs.types import FeatureType, PolicyFeature
config.input_features = {
"observation.state": PolicyFeature(
type=FeatureType.STATE,
shape=(7,),
),
"observation.images.face_view": PolicyFeature(
type=FeatureType.VISUAL,
shape=(3, 224, 224),
),
}
config.output_features = {
"action": PolicyFeature(
type=FeatureType.ACTION,
shape=(7,),
),
}
# Create dummy dataset stats
dataset_stats = {
"observation.state": {
"mean": torch.zeros(7),
"std": torch.ones(7),
},
"action": {
"mean": torch.zeros(7),
"std": torch.ones(7),
},
"observation.images.face_view": {
"mean": torch.zeros(3, 224, 224),
"std": torch.ones(3, 224, 224),
},
}
# Instantiate policy
policy = WallXPolicy(config)
preprocessor, postprocessor = make_wall_x_pre_post_processors(config=config, dataset_stats=dataset_stats)
# Test forward pass with dummy data
batch_size = 1
device = config.device
batch = {
"observation.state": torch.randn(batch_size, 7, dtype=torch.float32, device=device),
"action": torch.randn(batch_size, config.chunk_size, 7, dtype=torch.float32, device=device),
"observation.images.face_view": torch.rand(
batch_size, 3, 224, 224, dtype=torch.float32, device=device
), # Use rand for [0,1] range
"task": ["Pick up the object"] * batch_size,
}
batch = preprocessor(batch)
try:
loss, loss_dict = policy.forward(batch)
print(f"Forward pass successful. Loss: {loss_dict['loss']:.4f}")
except Exception as e:
print(f"Forward pass failed: {e}")
raise
try:
with torch.no_grad():
action = policy.select_action(batch)
action = postprocessor(action)
print(f"Action: {action}")
print(f"Action prediction successful. Action shape: {action.shape}")
except Exception as e:
print(f"Action prediction failed: {e}")
raise
def test_config_creation():
"""Test policy config creation through factory."""
try:
config = make_policy_config(
policy_type="wall_x",
)
print("Config created successfully through factory")
print(f" Config type: {type(config).__name__}")
except Exception as e:
print(f"Config creation failed: {e}")
raise
if __name__ == "__main__":
test_policy_instantiation()
test_config_creation()