Compare commits

..

1 Commits

Author SHA1 Message Date
Martino Russi 6c9d8e9de1 Add custom teleop 2025-11-04 14:58:43 +01:00
6 changed files with 367 additions and 40 deletions
+18 -14
View File
@@ -39,7 +39,6 @@ from lerobot.datasets.aggregate import aggregate_datasets
from lerobot.datasets.compute_stats import aggregate_stats from lerobot.datasets.compute_stats import aggregate_stats
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.datasets.utils import ( from lerobot.datasets.utils import (
DATA_DIR,
DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE,
DEFAULT_DATA_FILE_SIZE_IN_MB, DEFAULT_DATA_FILE_SIZE_IN_MB,
DEFAULT_DATA_PATH, DEFAULT_DATA_PATH,
@@ -963,23 +962,28 @@ def _copy_data_with_feature_changes(
remove_features: list[str] | None = None, remove_features: list[str] | None = None,
) -> None: ) -> None:
"""Copy data while adding or removing features.""" """Copy data while adding or removing features."""
data_dir = dataset.root / DATA_DIR if dataset.meta.episodes is None:
parquet_files = sorted(data_dir.glob("*/*.parquet")) dataset.meta.episodes = load_episodes(dataset.meta.root)
if not parquet_files: # Map file paths to episode indices to extract chunk/file indices
raise ValueError(f"No parquet files found in {data_dir}") file_to_episodes: dict[Path, set[int]] = {}
for ep_idx in range(dataset.meta.total_episodes):
file_path = dataset.meta.get_data_file_path(ep_idx)
if file_path not in file_to_episodes:
file_to_episodes[file_path] = set()
file_to_episodes[file_path].add(ep_idx)
frame_idx = 0 frame_idx = 0
for src_path in tqdm(parquet_files, desc="Processing data files"): for src_path in tqdm(sorted(file_to_episodes.keys()), desc="Processing data files"):
df = pd.read_parquet(src_path).reset_index(drop=True) df = pd.read_parquet(dataset.root / src_path).reset_index(drop=True)
relative_path = src_path.relative_to(dataset.root) # Get chunk_idx and file_idx from the source file's first episode
chunk_dir = relative_path.parts[1] episodes_in_file = file_to_episodes[src_path]
file_name = relative_path.parts[2] first_ep_idx = min(episodes_in_file)
src_ep = dataset.meta.episodes[first_ep_idx]
chunk_idx = int(chunk_dir.split("-")[1]) chunk_idx = src_ep["data/chunk_index"]
file_idx = int(file_name.split("-")[1].split(".")[0]) file_idx = src_ep["data/file_index"]
if remove_features: if remove_features:
df = df.drop(columns=remove_features, errors="ignore") df = df.drop(columns=remove_features, errors="ignore")
@@ -1005,7 +1009,7 @@ def _copy_data_with_feature_changes(
df[feature_name] = feature_slice df[feature_name] = feature_slice
frame_idx = end_idx frame_idx = end_idx
# Write using the same chunk/file structure as source # Write using the preserved chunk_idx and file_idx from source
dst_path = new_meta.root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx) dst_path = new_meta.root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
dst_path.parent.mkdir(parents=True, exist_ok=True) dst_path.parent.mkdir(parents=True, exist_ok=True)
@@ -50,9 +50,9 @@ from typing import Any
import jsonlines import jsonlines
import pandas as pd import pandas as pd
import pyarrow.parquet as pq import pyarrow as pa
import tqdm import tqdm
from datasets import Dataset, concatenate_datasets from datasets import Dataset, Features, Image
from huggingface_hub import HfApi, snapshot_download from huggingface_hub import HfApi, snapshot_download
from requests import HTTPError from requests import HTTPError
@@ -68,7 +68,6 @@ from lerobot.datasets.utils import (
LEGACY_EPISODES_STATS_PATH, LEGACY_EPISODES_STATS_PATH,
LEGACY_TASKS_PATH, LEGACY_TASKS_PATH,
cast_stats_to_numpy, cast_stats_to_numpy,
embed_images,
flatten_dict, flatten_dict,
get_file_size_in_mb, get_file_size_in_mb,
get_parquet_file_size_in_mb, get_parquet_file_size_in_mb,
@@ -175,33 +174,25 @@ def convert_tasks(root, new_root):
write_tasks(df_tasks, new_root) write_tasks(df_tasks, new_root)
def concat_data_files( def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys):
paths_to_cat: list[Path], new_root: Path, chunk_idx: int, file_idx: int, image_keys: list[str] # TODO(rcadene): to save RAM use Dataset.from_parquet(file) and concatenate_datasets
): dataframes = [pd.read_parquet(file) for file in paths_to_cat]
"""Concatenate multiple parquet data files into a single file. # Concatenate all DataFrames along rows
concatenated_df = pd.concat(dataframes, ignore_index=True)
Args:
paths_to_cat: List of parquet file paths to concatenate
new_root: Root directory for the new dataset
chunk_idx: Chunk index for the output file
file_idx: File index within the chunk
image_keys: List of feature keys that contain images
"""
datasets_list: list[Dataset] = [Dataset.from_parquet(str(file)) for file in paths_to_cat]
concatenated_ds: Dataset = concatenate_datasets(datasets_list)
if len(image_keys) > 0:
logging.debug(f"Embedding {len(image_keys)} image features for optimal training performance")
concatenated_ds = embed_images(concatenated_ds)
path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx) path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
path.parent.mkdir(parents=True, exist_ok=True) path.parent.mkdir(parents=True, exist_ok=True)
table = concatenated_ds.with_format("arrow")[:] if len(image_keys) > 0:
writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True) schema = pa.Schema.from_pandas(concatenated_df)
writer.write_table(table) features = Features.from_arrow_schema(schema)
writer.close() for key in image_keys:
features[key] = Image()
schema = features.arrow_schema
else:
schema = None
concatenated_df.to_parquet(path, index=False, schema=schema)
def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int): def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int):
@@ -0,0 +1,18 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .config_custom import CustomConfig
from .custom import Custom
@@ -0,0 +1,32 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("custom")
@dataclass
class CustomConfig(TeleoperatorConfig):
"""Custom teleoperator config that dynamically wraps a base teleoperator class.
The base class and its configuration are loaded from a JSON config file at runtime.
Port and baud_rate are taken from the first device in the config file.
"""
config_path: str | None = None # REQUIRED: Path to custom config JSON file
port: str = "/dev/ttyACM0" # Default port
baud_rate: int = 115200 # Default baud rate
+206
View File
@@ -0,0 +1,206 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import json
import logging
from pathlib import Path
from lerobot.motors.motors_bus import MotorNormMode
from ..teleoperator import Teleoperator
from .config_custom import CustomConfig
logger = logging.getLogger(__name__)
class Custom(Teleoperator):
"""
Custom teleoperator that dynamically wraps a base teleoperator class and applies configurable joint mapping.
The base class is specified in custom_config.json, allowing flexible teleoperator configurations.
"""
config_class = CustomConfig
name = "custom"
def __init__(self, config: CustomConfig):
# Load custom configuration from JSON file
if config.config_path is None:
raise ValueError(
"config_path must be provided for custom teleoperator. "
"Example: --teleop.config_path=/path/to/custom_config.json"
)
config_path = Path(config.config_path)
with open(config_path) as f:
custom_config = json.load(f)
logger.info(f"Loaded custom config from {config_path}")
logger.info(f"Found {len(custom_config)} teleoperator(s): {list(custom_config.keys())}")
# Initialize the base Teleoperator class
super().__init__(config)
# Store multiple base teleoperators and their action mappings
self.base_teleops = {}
self.robot_actions_configs = {}
# Instantiate each base teleoperator from the config
for device_name, device_config in custom_config.items():
base_class_name = device_config["base_class"]
# Create a config copy for this teleoperator
from dataclasses import replace
teleop_config = replace(
config,
port=device_config.get("port", config.port),
id=device_config.get("id", f"{config.id}_{device_name}"),
baud_rate=device_config.get("baud_rate", config.baud_rate)
)
logger.info(f" {device_name}: class={base_class_name}, port={teleop_config.port}, id={teleop_config.id}")
# Dynamically import and instantiate the base teleoperator class
module_path, class_name_full = base_class_name.rsplit(".", 1)
module = importlib.import_module(module_path)
base_class = getattr(module, class_name_full)
# Store the teleoperator and its action mapping
self.base_teleops[device_name] = base_class(teleop_config)
self.robot_actions_configs[device_name] = device_config["robot_actions"]
@property
def action_features(self) -> dict:
# Aggregate action features from all teleoperators' action mappings
all_actions = {}
for device_config in self.robot_actions_configs.values():
for robot_action in device_config.keys():
all_actions[robot_action] = float
return all_actions
@property
def feedback_features(self) -> dict:
# Aggregate feedback features from all base teleoperators
all_feedback = {}
for teleop in self.base_teleops.values():
all_feedback.update(teleop.feedback_features)
return all_feedback
@property
def is_connected(self) -> bool:
# All teleoperators must be connected
return all(teleop.is_connected for teleop in self.base_teleops.values())
@property
def is_calibrated(self) -> bool:
# All teleoperators must be calibrated
return all(teleop.is_calibrated for teleop in self.base_teleops.values())
def connect(self, calibrate: bool = True) -> None:
# Connect all base teleoperators
for device_name, teleop in self.base_teleops.items():
logger.info(f"Connecting {device_name}...")
teleop.connect(calibrate=calibrate)
def calibrate(self) -> None:
# Calibrate all base teleoperators
for device_name, teleop in self.base_teleops.items():
logger.info(f"Calibrating {device_name}...")
teleop.calibrate()
def configure(self) -> None:
# Configure all base teleoperators
for teleop in self.base_teleops.values():
teleop.configure()
def send_feedback(self, feedback: dict[str, float]) -> None:
# Send feedback to all base teleoperators
for teleop in self.base_teleops.values():
teleop.send_feedback(feedback)
def disconnect(self) -> None:
# Disconnect all base teleoperators
for device_name, teleop in self.base_teleops.items():
logger.info(f"Disconnecting {device_name}...")
teleop.disconnect()
def _normalize_to_unit_range(self, teleop, joint_name: str, value: float) -> float:
"""Convert a joint value from base teleoperator's normalization mode to [0, 1] range.
Args:
teleop: The base teleoperator instance
joint_name: Name of the joint (e.g., "shoulder_pitch")
value: Value in the base teleoperator's normalization mode
Returns:
Value normalized to [0, 1] range
"""
norm_mode = teleop.joints[joint_name]
if norm_mode == MotorNormMode.RANGE_M100_100:
# Convert from [-100, 100] to [0, 1]
return (value + 100.0) / 200.0
elif norm_mode == MotorNormMode.RANGE_0_100:
# Convert from [0, 100] to [0, 1]
return value / 100.0
elif norm_mode == MotorNormMode.DEGREES:
# For degrees, we need calibration to know the range
# Use calibration min/max to normalize
if teleop.calibration and joint_name in teleop.calibration:
min_deg = teleop.calibration[joint_name].range_min
max_deg = teleop.calibration[joint_name].range_max
if max_deg != min_deg:
return (value - min_deg) / (max_deg - min_deg)
# Fallback: assume common range like [-180, 180]
return (value + 180.0) / 360.0
else:
raise ValueError(f"Unknown normalization mode: {norm_mode}")
def get_action(self) -> dict[str, float]:
# Build action dict by reading from all base teleoperators
action = {}
# Loop through each teleoperator
for device_name, teleop in self.base_teleops.items():
# Read joint positions from this teleoperator
# These are in the teleoperator's normalization mode (e.g., -100 to 100)
joint_positions = teleop._read()
# Get the robot actions config for this teleoperator
robot_actions_config = self.robot_actions_configs[device_name]
# Process each robot action for this teleoperator
for robot_action, config in robot_actions_config.items():
if config["source"] == "neutral":
# Use fixed neutral value (already in [0, 1] range)
value = config["value"]
elif config["source"] == "teleop":
# Get value from teleop joint
teleop_joint = config["joint"]
value = joint_positions[teleop_joint]
# Convert from base teleoperator's normalization mode to [0, 1] range
value = self._normalize_to_unit_range(teleop, teleop_joint, value)
# Apply inversion if specified
if config.get("invert", False):
value = 1.0 - value
else:
raise ValueError(f"Unknown source '{config['source']}' for robot action '{robot_action}'")
action[robot_action] = value
return action
@@ -0,0 +1,76 @@
{
"right_arm": {
"base_class": "lerobot.teleoperators.homunculus.homunculus_arm.HomunculusArm",
"port": "/dev/ttyACM0",
"id": "unitree_right",
"baud_rate": 115200,
"robot_actions": {
"kRightShoulderPitch.pos": {
"source": "neutral",
"value": 0.5
},
"kRightShoulderRoll.pos": {
"source": "neutral",
"value": 0.5
},
"kRightShoulderYaw.pos": {
"source": "neutral",
"value": 0.5
},
"kRightElbow.pos": {
"source": "neutral",
"value": 0.5
},
"kRightWristRoll.pos": {
"source": "teleop",
"joint": "wrist_roll",
"invert": true
},
"kRightWristPitch.pos": {
"source": "neutral",
"value": 0.5
},
"kRightWristYaw.pos": {
"source": "neutral",
"value": 0.5
}
}
},
"left_arm": {
"base_class": "lerobot.teleoperators.homunculus.homunculus_arm.HomunculusArm",
"port": "/dev/ttyACM1",
"id": "unitree_left",
"baud_rate": 115200,
"robot_actions": {
"kLeftShoulderPitch.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftShoulderRoll.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftShoulderYaw.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftElbow.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftWristRoll.pos": {
"source": "teleop",
"joint": "wrist_roll",
"invert": true
},
"kLeftWristPitch.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftWristyaw.pos": {
"source": "neutral",
"value": 0.5
}
}
}
}