Compare commits

..

1 Commits

Author SHA1 Message Date
Martino Russi 6c9d8e9de1 Add custom teleop 2025-11-04 14:58:43 +01:00
13 changed files with 374 additions and 1483 deletions
+18 -14
View File
@@ -39,7 +39,6 @@ from lerobot.datasets.aggregate import aggregate_datasets
from lerobot.datasets.compute_stats import aggregate_stats
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.datasets.utils import (
DATA_DIR,
DEFAULT_CHUNK_SIZE,
DEFAULT_DATA_FILE_SIZE_IN_MB,
DEFAULT_DATA_PATH,
@@ -963,23 +962,28 @@ def _copy_data_with_feature_changes(
remove_features: list[str] | None = None,
) -> None:
"""Copy data while adding or removing features."""
data_dir = dataset.root / DATA_DIR
parquet_files = sorted(data_dir.glob("*/*.parquet"))
if dataset.meta.episodes is None:
dataset.meta.episodes = load_episodes(dataset.meta.root)
if not parquet_files:
raise ValueError(f"No parquet files found in {data_dir}")
# Map file paths to episode indices to extract chunk/file indices
file_to_episodes: dict[Path, set[int]] = {}
for ep_idx in range(dataset.meta.total_episodes):
file_path = dataset.meta.get_data_file_path(ep_idx)
if file_path not in file_to_episodes:
file_to_episodes[file_path] = set()
file_to_episodes[file_path].add(ep_idx)
frame_idx = 0
for src_path in tqdm(parquet_files, desc="Processing data files"):
df = pd.read_parquet(src_path).reset_index(drop=True)
for src_path in tqdm(sorted(file_to_episodes.keys()), desc="Processing data files"):
df = pd.read_parquet(dataset.root / src_path).reset_index(drop=True)
relative_path = src_path.relative_to(dataset.root)
chunk_dir = relative_path.parts[1]
file_name = relative_path.parts[2]
chunk_idx = int(chunk_dir.split("-")[1])
file_idx = int(file_name.split("-")[1].split(".")[0])
# Get chunk_idx and file_idx from the source file's first episode
episodes_in_file = file_to_episodes[src_path]
first_ep_idx = min(episodes_in_file)
src_ep = dataset.meta.episodes[first_ep_idx]
chunk_idx = src_ep["data/chunk_index"]
file_idx = src_ep["data/file_index"]
if remove_features:
df = df.drop(columns=remove_features, errors="ignore")
@@ -1005,7 +1009,7 @@ def _copy_data_with_feature_changes(
df[feature_name] = feature_slice
frame_idx = end_idx
# Write using the same chunk/file structure as source
# Write using the preserved chunk_idx and file_idx from source
dst_path = new_meta.root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
dst_path.parent.mkdir(parents=True, exist_ok=True)
@@ -45,7 +45,7 @@ class DiffusionConfig(PreTrainedConfig):
Args:
n_obs_steps: Number of environment steps worth of observations to pass to the policy (takes the
current step and additional steps going back).
chunk_size: Diffusion model action prediction size as detailed in `DiffusionPolicy.select_action`.
horizon: Diffusion model action prediction size as detailed in `DiffusionPolicy.select_action`.
n_action_steps: The number of action steps to run in the environment for one invocation of the policy.
See `DiffusionPolicy.select_action` for more details.
input_shapes: A dictionary defining the shapes of the input data for the policy. The key represents
@@ -105,7 +105,7 @@ class DiffusionConfig(PreTrainedConfig):
# Inputs / output structure.
n_obs_steps: int = 2
chunk_size: int = 16
horizon: int = 16
n_action_steps: int = 8
normalization_mapping: dict[str, NormalizationMode] = field(
@@ -118,7 +118,7 @@ class DiffusionConfig(PreTrainedConfig):
# The original implementation doesn't sample frames for the last 7 steps,
# which avoids excessive padding and leads to improved training results.
drop_n_last_frames: int = 7 # chunk_size - n_action_steps - n_obs_steps + 1
drop_n_last_frames: int = 7 # horizon - n_action_steps - n_obs_steps + 1
# Architecture / modeling.
# Vision backbone.
@@ -180,13 +180,13 @@ class DiffusionConfig(PreTrainedConfig):
f"Got {self.noise_scheduler_type}."
)
# Check that the chunk size and U-Net downsampling is compatible.
# Check that the horizon size and U-Net downsampling is compatible.
# U-Net downsamples by 2 with each stage.
downsampling_factor = 2 ** len(self.down_dims)
if self.chunk_size % downsampling_factor != 0:
if self.horizon % downsampling_factor != 0:
raise ValueError(
"The chunk_size should be an integer multiple of the downsampling factor (which is determined "
f"by `len(down_dims)`). Got {self.chunk_size=} and {self.down_dims=}"
"The horizon should be an integer multiple of the downsampling factor (which is determined "
f"by `len(down_dims)`). Got {self.horizon=} and {self.down_dims=}"
)
def get_optimizer_preset(self) -> AdamConfig:
@@ -231,7 +231,7 @@ class DiffusionConfig(PreTrainedConfig):
@property
def action_delta_indices(self) -> list:
return list(range(1 - self.n_obs_steps, 1 - self.n_obs_steps + self.chunk_size))
return list(range(1 - self.n_obs_steps, 1 - self.n_obs_steps + self.horizon))
@property
def reward_delta_indices(self) -> None:
@@ -99,25 +99,25 @@ class DiffusionPolicy(PreTrainedPolicy):
return actions
@torch.no_grad()
def select_action(self, batch: dict[str, Tensor], noise: Tensor | None = None, **kwargs) -> Tensor:
def select_action(self, batch: dict[str, Tensor], noise: Tensor | None = None) -> Tensor:
"""Select a single action given environment observations.
This method handles caching a history of observations and an action trajectory generated by the
underlying diffusion model. Here's how it works:
- `n_obs_steps` steps worth of observations are cached (for the first steps, the observation is
copied `n_obs_steps` times to fill the cache).
- The diffusion model generates `chunk_size` steps worth of actions.
- The diffusion model generates `horizon` steps worth of actions.
- `n_action_steps` worth of actions are actually kept for execution, starting from the current step.
Schematically this looks like:
----------------------------------------------------------------------------------------------
(legend: o = n_obs_steps, c = chunk_size, a = n_action_steps)
(legend: o = n_obs_steps, h = horizon, a = n_action_steps)
|timestep | n-o+1 | n-o+2 | ..... | n | ..... | n+a-1 | n+a | ..... | n-o+h |
|observation is used | YES | YES | YES | YES | NO | NO | NO | NO | NO |
|action is generated | YES | YES | YES | YES | YES | YES | YES | YES | YES |
|action is used | NO | NO | NO | YES | YES | YES | NO | NO | NO |
----------------------------------------------------------------------------------------------
Note that this means we require: `n_action_steps <= chunk_size - n_obs_steps + 1`. Also, note that
this period is
Note that this means we require: `n_action_steps <= horizon - n_obs_steps + 1`. Also, note that
"horizon" may not the best name to describe what the variable actually means, because this period is
actually measured from the first observation which (if `n_obs_steps` > 1) happened in the past.
"""
# NOTE: for offline evaluation, we have action in the batch, so we need to pop it out
@@ -213,7 +213,7 @@ class DiffusionModel(nn.Module):
noise
if noise is not None
else torch.randn(
size=(batch_size, self.config.chunk_size, self.config.action_feature.shape[0]),
size=(batch_size, self.config.horizon, self.config.action_feature.shape[0]),
dtype=dtype,
device=device,
generator=generator,
@@ -309,16 +309,16 @@ class DiffusionModel(nn.Module):
AND/OR
"observation.environment_state": (B, n_obs_steps, environment_dim)
"action": (B, chunk_size, action_dim)
"action_is_pad": (B, chunk_size)
"action": (B, horizon, action_dim)
"action_is_pad": (B, horizon)
}
"""
# Input validation.
assert set(batch).issuperset({OBS_STATE, ACTION, "action_is_pad"})
assert OBS_IMAGES in batch or OBS_ENV_STATE in batch
n_obs_steps = batch[OBS_STATE].shape[1]
chunk_size = batch[ACTION].shape[1]
assert chunk_size == self.config.chunk_size
horizon = batch[ACTION].shape[1]
assert horizon == self.config.horizon
assert n_obs_steps == self.config.n_obs_steps
# Encode image features and concatenate them all together along with the state vector.
@@ -1,242 +0,0 @@
# !/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.configs.policies import PreTrainedConfig
from lerobot.configs.types import NormalizationMode
from lerobot.optim.optimizers import MultiAdamConfig
from lerobot.utils.constants import ACTION, OBS_IMAGE, OBS_STATE
def is_image_feature(key: str) -> bool:
"""Check if a feature key represents an image feature.
Args:
key: The feature key to check
Returns:
True if the key represents an image feature, False otherwise
"""
return key.startswith(OBS_IMAGE)
@dataclass
class ConcurrencyConfig:
"""Configuration for the concurrency of the actor and learner.
Possible values are:
- "threads": Use threads for the actor and learner.
- "processes": Use processes for the actor and learner.
"""
actor: str = "threads"
learner: str = "threads"
@dataclass
class ActorLearnerConfig:
learner_host: str = "127.0.0.1"
learner_port: int = 50051
policy_parameters_push_frequency: int = 4
queue_get_timeout: float = 2
@dataclass
class CriticNetworkConfig:
hidden_dims: list[int] = field(default_factory=lambda: [256, 256])
activate_final: bool = True
final_activation: str | None = None
@dataclass
class ActorNetworkConfig:
hidden_dims: list[int] = field(default_factory=lambda: [256, 256])
activate_final: bool = True
use_layer_norm: bool = True
@dataclass
class NoiseActorConfig:
"""Configuration for the noise actor in DSRL.
The noise actor outputs noise that gets fed to the diffusion policy.
"""
use_tanh_squash: bool = False # Whether to bound the noise output
std_min: float = 1e-5
std_max: float = 2.0
init_final: float = 0.05
@PreTrainedConfig.register_subclass("dsrl")
@dataclass
class DSRLConfig(PreTrainedConfig):
"""Diffusion Steering via Reinforcement Learning (DSRL) configuration."""
# Mapping of feature types to normalization modes
normalization_mapping: dict[str, NormalizationMode] = field(
default_factory=lambda: {
"VISUAL": NormalizationMode.MEAN_STD,
"STATE": NormalizationMode.MIN_MAX,
"ENV": NormalizationMode.MIN_MAX,
"ACTION": NormalizationMode.MIN_MAX,
}
)
# Statistics for normalizing different types of inputs
dataset_stats: dict[str, dict[str, list[float]]] | None = field(
default_factory=lambda: {
OBS_IMAGE: {
"mean": [0.485, 0.456, 0.406],
"std": [0.229, 0.224, 0.225],
},
OBS_STATE: {
"min": [0.0, 0.0],
"max": [1.0, 1.0],
},
ACTION: {
"min": [0.0, 0.0, 0.0],
"max": [1.0, 1.0, 1.0],
},
}
)
# Architecture specifics
# Device to run the model on (e.g., "cuda", "cpu")
device: str = "cpu"
# Device to store the model on
storage_device: str = "cpu"
# Name of the vision encoder model (Set to "helper2424/resnet10" for hil serl resnet10)
vision_encoder_name: str | None = None
# Whether to freeze the vision encoder during training
freeze_vision_encoder: bool = True
# Hidden dimension size for the image encoder
image_encoder_hidden_dim: int = 32
# Whether to use a shared encoder for actor and critic
shared_encoder: bool = True
# Number of discrete actions, eg for gripper actions
num_discrete_actions: int | None = None
# Dimension of the image embedding pooling
image_embedding_pooling_dim: int = 8
# Name of the action policy
action_policy_name: str = "pi0"
action_policy_weights: str | None = "lerobot/pi0_base"
# Training parameter
# Number of steps for online training
online_steps: int = 1000000
# Capacity of the online replay buffer
online_buffer_capacity: int = 100000
# Capacity of the offline replay buffer
offline_buffer_capacity: int = 100000
# Whether to use asynchronous prefetching for the buffers
async_prefetch: bool = False
# Number of steps before learning starts
online_step_before_learning: int = 100
# Frequency of policy updates
policy_update_freq: int = 1
# SAC algorithm parameters
discount: float = 0.99
# Initial temperature value
temperature_init: float = 1.0
# Number of critics in the ensemble
num_critics: int = 2
# Number of subsampled critics for training
num_subsample_critics: int | None = None
# Learning rate for the critic network
critic_lr: float = 3e-4
# Learning rate for the actor network
actor_lr: float = 3e-4
# Learning rate for the temperature parameter
temperature_lr: float = 3e-4
# Weight for the critic target update
critic_target_update_weight: float = 0.005
# Update-to-data ratio for the UTD algorithm (If you want enable utd_ratio, you need to set it to >1)
utd_ratio: int = 1
# Hidden dimension size for the state encoder
state_encoder_hidden_dim: int = 256
# Dimension of the latent space
latent_dim: int = 256
# Target entropy for the SAC algorithm
target_entropy: float | None = None
# Whether to use backup entropy for the SAC algorithm
use_backup_entropy: bool = True
# Gradient clipping norm for the SAC algorithm
grad_clip_norm: float = 40.0
# Network configuration
# Configuration for the critic network architecture
critic_network_kwargs: CriticNetworkConfig = field(default_factory=CriticNetworkConfig)
# Configuration for the noise critic network architecture
noise_critic_network_kwargs: CriticNetworkConfig = field(default_factory=CriticNetworkConfig)
# Configuration for the noise actor network architecture
noise_actor_network_kwargs: ActorNetworkConfig = field(default_factory=ActorNetworkConfig)
# Configuration for the noise actor specific parameters
noise_actor_kwargs: NoiseActorConfig = field(default_factory=NoiseActorConfig)
# Configuration for actor-learner architecture
actor_learner_config: ActorLearnerConfig = field(default_factory=ActorLearnerConfig)
# Configuration for concurrency settings (you can use threads or processes for the actor and learner)
concurrency: ConcurrencyConfig = field(default_factory=ConcurrencyConfig)
# Optimizations
use_torch_compile: bool = True
def __post_init__(self):
super().__post_init__()
def get_optimizer_preset(self) -> MultiAdamConfig:
return MultiAdamConfig(
weight_decay=0.0,
optimizer_groups={
"critic_action": {"lr": self.critic_lr},
"critic_noise": {"lr": self.critic_lr},
"noise_actor": {"lr": self.actor_lr},
"temperature": {"lr": self.temperature_lr},
},
)
def get_scheduler_preset(self) -> None:
return None
def validate_features(self) -> None:
has_image = any(is_image_feature(key) for key in self.input_features)
has_state = OBS_STATE in self.input_features
if not (has_state or has_image):
raise ValueError(
"You must provide either 'observation.state' or an image observation (key starting with 'observation.image') in the input features"
)
if ACTION not in self.output_features:
raise ValueError("You must provide 'action' in the output features")
@property
def image_features(self) -> list[str]:
return [key for key in self.input_features if is_image_feature(key)]
@property
def observation_delta_indices(self) -> list:
return None
@property
def action_delta_indices(self) -> list:
return None
@property
def reward_delta_indices(self) -> None:
return None
File diff suppressed because it is too large Load Diff
@@ -1,89 +0,0 @@
# !/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Processor for DSRL policy.
DSRL uses a similar processing pipeline as SAC since it operates on
state-action transitions. The main difference is that internally it
also works with noise, but that's handled within the policy itself.
"""
from typing import Any
import torch
from lerobot.policies.dsrl.configuration_dsrl import DSRLConfig
from lerobot.processor import (
AddBatchDimensionProcessorStep,
DeviceProcessorStep,
NormalizerProcessorStep,
PolicyAction,
PolicyProcessorPipeline,
RenameObservationsProcessorStep,
UnnormalizerProcessorStep,
)
from lerobot.processor.converters import (
policy_action_to_transition,
transition_to_policy_action,
)
from lerobot.utils.constants import POLICY_POSTPROCESSOR_DEFAULT_NAME, POLICY_PREPROCESSOR_DEFAULT_NAME
def make_dsrl_pre_post_processors(
config: DSRLConfig,
dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None,
) -> tuple[
PolicyProcessorPipeline[dict, dict],
PolicyProcessorPipeline[PolicyAction, PolicyAction],
]:
"""Create preprocessor and postprocessor pipelines for DSRL policy.
Args:
config: DSRL policy configuration
dataset_stats: Optional dataset statistics for normalization
Returns:
Tuple of (preprocessor, postprocessor) pipelines
"""
input_steps = [
RenameObservationsProcessorStep(rename_map={}),
AddBatchDimensionProcessorStep(),
DeviceProcessorStep(device=config.device),
NormalizerProcessorStep(
features={**config.input_features, **config.output_features},
norm_map=config.normalization_mapping,
stats=dataset_stats,
),
]
output_steps = [
UnnormalizerProcessorStep(
features=config.output_features, norm_map=config.normalization_mapping, stats=dataset_stats
),
DeviceProcessorStep(device="cpu"),
]
return (
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]](
steps=input_steps,
name=POLICY_PREPROCESSOR_DEFAULT_NAME,
),
PolicyProcessorPipeline[PolicyAction, PolicyAction](
steps=output_steps,
name=POLICY_POSTPROCESSOR_DEFAULT_NAME,
to_transition=policy_action_to_transition,
to_output=transition_to_policy_action,
),
)
+2 -24
View File
@@ -30,7 +30,6 @@ from lerobot.envs.configs import EnvConfig
from lerobot.envs.utils import env_to_policy_features
from lerobot.policies.act.configuration_act import ACTConfig
from lerobot.policies.diffusion.configuration_diffusion import DiffusionConfig
from lerobot.policies.dsrl.configuration_dsrl import DSRLConfig
from lerobot.policies.groot.configuration_groot import GrootConfig
from lerobot.policies.pi0.configuration_pi0 import PI0Config
from lerobot.policies.pi05.configuration_pi05 import PI05Config
@@ -60,7 +59,7 @@ def get_policy_class(name: str) -> type[PreTrainedPolicy]:
Args:
name: The name of the policy. Supported names are "tdmpc", "diffusion", "act",
"vqbet", "pi0", "pi05", "sac", "reward_classifier", "smolvla", "dsrl".
"vqbet", "pi0", "pi05", "sac", "reward_classifier", "smolvla".
Returns:
The policy class corresponding to the given name.
@@ -104,10 +103,6 @@ def get_policy_class(name: str) -> type[PreTrainedPolicy]:
from lerobot.policies.smolvla.modeling_smolvla import SmolVLAPolicy
return SmolVLAPolicy
elif name == "dsrl":
from lerobot.policies.dsrl.modeling_dsrl import DSRLPolicy
return DSRLPolicy
elif name == "groot":
from lerobot.policies.groot.modeling_groot import GrootPolicy
@@ -126,7 +121,7 @@ def make_policy_config(policy_type: str, **kwargs) -> PreTrainedConfig:
Args:
policy_type: The type of the policy. Supported types include "tdmpc",
"diffusion", "act", "vqbet", "pi0", "pi05", "sac", "smolvla",
"reward_classifier", "dsrl".
"reward_classifier".
**kwargs: Keyword arguments to be passed to the configuration class constructor.
Returns:
@@ -153,8 +148,6 @@ def make_policy_config(policy_type: str, **kwargs) -> PreTrainedConfig:
return SmolVLAConfig(**kwargs)
elif policy_type == "reward_classifier":
return RewardClassifierConfig(**kwargs)
elif policy_type == "dsrl":
return DSRLConfig(**kwargs)
elif policy_type == "groot":
return GrootConfig(**kwargs)
else:
@@ -328,21 +321,6 @@ def make_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(policy_cfg, DSRLConfig):
from lerobot.policies.dsrl.processor_dsrl import make_dsrl_pre_post_processors
processors = make_dsrl_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(policy_cfg, GrootConfig):
from lerobot.policies.groot.processor_groot import make_groot_pre_post_processors
processors = make_groot_pre_post_processors(
config=policy_cfg,
dataset_stats=kwargs.get("dataset_stats"),
)
elif isinstance(policy_cfg, GrootConfig):
from lerobot.policies.groot.processor_groot import make_groot_pre_post_processors
+2 -2
View File
@@ -1148,7 +1148,7 @@ class PI0Policy(PreTrainedPolicy):
return self._action_queue.popleft()
@torch.no_grad()
def predict_action_chunk(self, batch: dict[str, Tensor], noise: Tensor | None = None) -> Tensor:
def predict_action_chunk(self, batch: dict[str, Tensor]) -> Tensor:
"""Predict a chunk of actions given environment observations."""
self.eval()
@@ -1158,7 +1158,7 @@ class PI0Policy(PreTrainedPolicy):
state = self.prepare_state(batch)
# Sample actions using the model
actions = self.model.sample_actions(images, img_masks, lang_tokens, lang_masks, state, noise)
actions = self.model.sample_actions(images, img_masks, lang_tokens, lang_masks, state)
# Unpad actions to actual action dimension
original_action_dim = self.config.output_features[ACTION].shape[0]
+2 -2
View File
@@ -1120,7 +1120,7 @@ class PI05Policy(PreTrainedPolicy):
return self._action_queue.popleft()
@torch.no_grad()
def predict_action_chunk(self, batch: dict[str, Tensor], noise: Tensor | None = None) -> Tensor:
def predict_action_chunk(self, batch: dict[str, Tensor]) -> Tensor:
"""Predict a chunk of actions given environment observations."""
self.eval()
@@ -1129,7 +1129,7 @@ class PI05Policy(PreTrainedPolicy):
tokens, masks = batch[f"{OBS_LANGUAGE_TOKENS}"], batch[f"{OBS_LANGUAGE_ATTENTION_MASK}"]
# Sample actions using the model (no separate state needed for PI05)
actions = self.model.sample_actions(images, img_masks, tokens, masks, noise)
actions = self.model.sample_actions(images, img_masks, tokens, masks)
# Unpad actions to actual action dimension
original_action_dim = self.config.output_features[ACTION].shape[0]
@@ -0,0 +1,18 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .config_custom import CustomConfig
from .custom import Custom
@@ -0,0 +1,32 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("custom")
@dataclass
class CustomConfig(TeleoperatorConfig):
"""Custom teleoperator config that dynamically wraps a base teleoperator class.
The base class and its configuration are loaded from a JSON config file at runtime.
Port and baud_rate are taken from the first device in the config file.
"""
config_path: str | None = None # REQUIRED: Path to custom config JSON file
port: str = "/dev/ttyACM0" # Default port
baud_rate: int = 115200 # Default baud rate
+206
View File
@@ -0,0 +1,206 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import json
import logging
from pathlib import Path
from lerobot.motors.motors_bus import MotorNormMode
from ..teleoperator import Teleoperator
from .config_custom import CustomConfig
logger = logging.getLogger(__name__)
class Custom(Teleoperator):
"""
Custom teleoperator that dynamically wraps a base teleoperator class and applies configurable joint mapping.
The base class is specified in custom_config.json, allowing flexible teleoperator configurations.
"""
config_class = CustomConfig
name = "custom"
def __init__(self, config: CustomConfig):
# Load custom configuration from JSON file
if config.config_path is None:
raise ValueError(
"config_path must be provided for custom teleoperator. "
"Example: --teleop.config_path=/path/to/custom_config.json"
)
config_path = Path(config.config_path)
with open(config_path) as f:
custom_config = json.load(f)
logger.info(f"Loaded custom config from {config_path}")
logger.info(f"Found {len(custom_config)} teleoperator(s): {list(custom_config.keys())}")
# Initialize the base Teleoperator class
super().__init__(config)
# Store multiple base teleoperators and their action mappings
self.base_teleops = {}
self.robot_actions_configs = {}
# Instantiate each base teleoperator from the config
for device_name, device_config in custom_config.items():
base_class_name = device_config["base_class"]
# Create a config copy for this teleoperator
from dataclasses import replace
teleop_config = replace(
config,
port=device_config.get("port", config.port),
id=device_config.get("id", f"{config.id}_{device_name}"),
baud_rate=device_config.get("baud_rate", config.baud_rate)
)
logger.info(f" {device_name}: class={base_class_name}, port={teleop_config.port}, id={teleop_config.id}")
# Dynamically import and instantiate the base teleoperator class
module_path, class_name_full = base_class_name.rsplit(".", 1)
module = importlib.import_module(module_path)
base_class = getattr(module, class_name_full)
# Store the teleoperator and its action mapping
self.base_teleops[device_name] = base_class(teleop_config)
self.robot_actions_configs[device_name] = device_config["robot_actions"]
@property
def action_features(self) -> dict:
# Aggregate action features from all teleoperators' action mappings
all_actions = {}
for device_config in self.robot_actions_configs.values():
for robot_action in device_config.keys():
all_actions[robot_action] = float
return all_actions
@property
def feedback_features(self) -> dict:
# Aggregate feedback features from all base teleoperators
all_feedback = {}
for teleop in self.base_teleops.values():
all_feedback.update(teleop.feedback_features)
return all_feedback
@property
def is_connected(self) -> bool:
# All teleoperators must be connected
return all(teleop.is_connected for teleop in self.base_teleops.values())
@property
def is_calibrated(self) -> bool:
# All teleoperators must be calibrated
return all(teleop.is_calibrated for teleop in self.base_teleops.values())
def connect(self, calibrate: bool = True) -> None:
# Connect all base teleoperators
for device_name, teleop in self.base_teleops.items():
logger.info(f"Connecting {device_name}...")
teleop.connect(calibrate=calibrate)
def calibrate(self) -> None:
# Calibrate all base teleoperators
for device_name, teleop in self.base_teleops.items():
logger.info(f"Calibrating {device_name}...")
teleop.calibrate()
def configure(self) -> None:
# Configure all base teleoperators
for teleop in self.base_teleops.values():
teleop.configure()
def send_feedback(self, feedback: dict[str, float]) -> None:
# Send feedback to all base teleoperators
for teleop in self.base_teleops.values():
teleop.send_feedback(feedback)
def disconnect(self) -> None:
# Disconnect all base teleoperators
for device_name, teleop in self.base_teleops.items():
logger.info(f"Disconnecting {device_name}...")
teleop.disconnect()
def _normalize_to_unit_range(self, teleop, joint_name: str, value: float) -> float:
"""Convert a joint value from base teleoperator's normalization mode to [0, 1] range.
Args:
teleop: The base teleoperator instance
joint_name: Name of the joint (e.g., "shoulder_pitch")
value: Value in the base teleoperator's normalization mode
Returns:
Value normalized to [0, 1] range
"""
norm_mode = teleop.joints[joint_name]
if norm_mode == MotorNormMode.RANGE_M100_100:
# Convert from [-100, 100] to [0, 1]
return (value + 100.0) / 200.0
elif norm_mode == MotorNormMode.RANGE_0_100:
# Convert from [0, 100] to [0, 1]
return value / 100.0
elif norm_mode == MotorNormMode.DEGREES:
# For degrees, we need calibration to know the range
# Use calibration min/max to normalize
if teleop.calibration and joint_name in teleop.calibration:
min_deg = teleop.calibration[joint_name].range_min
max_deg = teleop.calibration[joint_name].range_max
if max_deg != min_deg:
return (value - min_deg) / (max_deg - min_deg)
# Fallback: assume common range like [-180, 180]
return (value + 180.0) / 360.0
else:
raise ValueError(f"Unknown normalization mode: {norm_mode}")
def get_action(self) -> dict[str, float]:
# Build action dict by reading from all base teleoperators
action = {}
# Loop through each teleoperator
for device_name, teleop in self.base_teleops.items():
# Read joint positions from this teleoperator
# These are in the teleoperator's normalization mode (e.g., -100 to 100)
joint_positions = teleop._read()
# Get the robot actions config for this teleoperator
robot_actions_config = self.robot_actions_configs[device_name]
# Process each robot action for this teleoperator
for robot_action, config in robot_actions_config.items():
if config["source"] == "neutral":
# Use fixed neutral value (already in [0, 1] range)
value = config["value"]
elif config["source"] == "teleop":
# Get value from teleop joint
teleop_joint = config["joint"]
value = joint_positions[teleop_joint]
# Convert from base teleoperator's normalization mode to [0, 1] range
value = self._normalize_to_unit_range(teleop, teleop_joint, value)
# Apply inversion if specified
if config.get("invert", False):
value = 1.0 - value
else:
raise ValueError(f"Unknown source '{config['source']}' for robot action '{robot_action}'")
action[robot_action] = value
return action
@@ -0,0 +1,76 @@
{
"right_arm": {
"base_class": "lerobot.teleoperators.homunculus.homunculus_arm.HomunculusArm",
"port": "/dev/ttyACM0",
"id": "unitree_right",
"baud_rate": 115200,
"robot_actions": {
"kRightShoulderPitch.pos": {
"source": "neutral",
"value": 0.5
},
"kRightShoulderRoll.pos": {
"source": "neutral",
"value": 0.5
},
"kRightShoulderYaw.pos": {
"source": "neutral",
"value": 0.5
},
"kRightElbow.pos": {
"source": "neutral",
"value": 0.5
},
"kRightWristRoll.pos": {
"source": "teleop",
"joint": "wrist_roll",
"invert": true
},
"kRightWristPitch.pos": {
"source": "neutral",
"value": 0.5
},
"kRightWristYaw.pos": {
"source": "neutral",
"value": 0.5
}
}
},
"left_arm": {
"base_class": "lerobot.teleoperators.homunculus.homunculus_arm.HomunculusArm",
"port": "/dev/ttyACM1",
"id": "unitree_left",
"baud_rate": 115200,
"robot_actions": {
"kLeftShoulderPitch.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftShoulderRoll.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftShoulderYaw.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftElbow.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftWristRoll.pos": {
"source": "teleop",
"joint": "wrist_roll",
"invert": true
},
"kLeftWristPitch.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftWristyaw.pos": {
"source": "neutral",
"value": 0.5
}
}
}
}