Compare commits

..

1 Commits

Author SHA1 Message Date
Martino Russi 6c9d8e9de1 Add custom teleop 2025-11-04 14:58:43 +01:00
6 changed files with 367 additions and 40 deletions
+18 -14
View File
@@ -39,7 +39,6 @@ from lerobot.datasets.aggregate import aggregate_datasets
from lerobot.datasets.compute_stats import aggregate_stats
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.datasets.utils import (
DATA_DIR,
DEFAULT_CHUNK_SIZE,
DEFAULT_DATA_FILE_SIZE_IN_MB,
DEFAULT_DATA_PATH,
@@ -963,23 +962,28 @@ def _copy_data_with_feature_changes(
remove_features: list[str] | None = None,
) -> None:
"""Copy data while adding or removing features."""
data_dir = dataset.root / DATA_DIR
parquet_files = sorted(data_dir.glob("*/*.parquet"))
if dataset.meta.episodes is None:
dataset.meta.episodes = load_episodes(dataset.meta.root)
if not parquet_files:
raise ValueError(f"No parquet files found in {data_dir}")
# Map file paths to episode indices to extract chunk/file indices
file_to_episodes: dict[Path, set[int]] = {}
for ep_idx in range(dataset.meta.total_episodes):
file_path = dataset.meta.get_data_file_path(ep_idx)
if file_path not in file_to_episodes:
file_to_episodes[file_path] = set()
file_to_episodes[file_path].add(ep_idx)
frame_idx = 0
for src_path in tqdm(parquet_files, desc="Processing data files"):
df = pd.read_parquet(src_path).reset_index(drop=True)
for src_path in tqdm(sorted(file_to_episodes.keys()), desc="Processing data files"):
df = pd.read_parquet(dataset.root / src_path).reset_index(drop=True)
relative_path = src_path.relative_to(dataset.root)
chunk_dir = relative_path.parts[1]
file_name = relative_path.parts[2]
chunk_idx = int(chunk_dir.split("-")[1])
file_idx = int(file_name.split("-")[1].split(".")[0])
# Get chunk_idx and file_idx from the source file's first episode
episodes_in_file = file_to_episodes[src_path]
first_ep_idx = min(episodes_in_file)
src_ep = dataset.meta.episodes[first_ep_idx]
chunk_idx = src_ep["data/chunk_index"]
file_idx = src_ep["data/file_index"]
if remove_features:
df = df.drop(columns=remove_features, errors="ignore")
@@ -1005,7 +1009,7 @@ def _copy_data_with_feature_changes(
df[feature_name] = feature_slice
frame_idx = end_idx
# Write using the same chunk/file structure as source
# Write using the preserved chunk_idx and file_idx from source
dst_path = new_meta.root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
dst_path.parent.mkdir(parents=True, exist_ok=True)
@@ -50,9 +50,9 @@ from typing import Any
import jsonlines
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
import tqdm
from datasets import Dataset, concatenate_datasets
from datasets import Dataset, Features, Image
from huggingface_hub import HfApi, snapshot_download
from requests import HTTPError
@@ -68,7 +68,6 @@ from lerobot.datasets.utils import (
LEGACY_EPISODES_STATS_PATH,
LEGACY_TASKS_PATH,
cast_stats_to_numpy,
embed_images,
flatten_dict,
get_file_size_in_mb,
get_parquet_file_size_in_mb,
@@ -175,33 +174,25 @@ def convert_tasks(root, new_root):
write_tasks(df_tasks, new_root)
def concat_data_files(
paths_to_cat: list[Path], new_root: Path, chunk_idx: int, file_idx: int, image_keys: list[str]
):
"""Concatenate multiple parquet data files into a single file.
Args:
paths_to_cat: List of parquet file paths to concatenate
new_root: Root directory for the new dataset
chunk_idx: Chunk index for the output file
file_idx: File index within the chunk
image_keys: List of feature keys that contain images
"""
datasets_list: list[Dataset] = [Dataset.from_parquet(str(file)) for file in paths_to_cat]
concatenated_ds: Dataset = concatenate_datasets(datasets_list)
if len(image_keys) > 0:
logging.debug(f"Embedding {len(image_keys)} image features for optimal training performance")
concatenated_ds = embed_images(concatenated_ds)
def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys):
# TODO(rcadene): to save RAM use Dataset.from_parquet(file) and concatenate_datasets
dataframes = [pd.read_parquet(file) for file in paths_to_cat]
# Concatenate all DataFrames along rows
concatenated_df = pd.concat(dataframes, ignore_index=True)
path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
path.parent.mkdir(parents=True, exist_ok=True)
table = concatenated_ds.with_format("arrow")[:]
writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True)
writer.write_table(table)
writer.close()
if len(image_keys) > 0:
schema = pa.Schema.from_pandas(concatenated_df)
features = Features.from_arrow_schema(schema)
for key in image_keys:
features[key] = Image()
schema = features.arrow_schema
else:
schema = None
concatenated_df.to_parquet(path, index=False, schema=schema)
def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int):
@@ -0,0 +1,18 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .config_custom import CustomConfig
from .custom import Custom
@@ -0,0 +1,32 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("custom")
@dataclass
class CustomConfig(TeleoperatorConfig):
"""Custom teleoperator config that dynamically wraps a base teleoperator class.
The base class and its configuration are loaded from a JSON config file at runtime.
Port and baud_rate are taken from the first device in the config file.
"""
config_path: str | None = None # REQUIRED: Path to custom config JSON file
port: str = "/dev/ttyACM0" # Default port
baud_rate: int = 115200 # Default baud rate
+206
View File
@@ -0,0 +1,206 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import json
import logging
from pathlib import Path
from lerobot.motors.motors_bus import MotorNormMode
from ..teleoperator import Teleoperator
from .config_custom import CustomConfig
logger = logging.getLogger(__name__)
class Custom(Teleoperator):
"""
Custom teleoperator that dynamically wraps a base teleoperator class and applies configurable joint mapping.
The base class is specified in custom_config.json, allowing flexible teleoperator configurations.
"""
config_class = CustomConfig
name = "custom"
def __init__(self, config: CustomConfig):
# Load custom configuration from JSON file
if config.config_path is None:
raise ValueError(
"config_path must be provided for custom teleoperator. "
"Example: --teleop.config_path=/path/to/custom_config.json"
)
config_path = Path(config.config_path)
with open(config_path) as f:
custom_config = json.load(f)
logger.info(f"Loaded custom config from {config_path}")
logger.info(f"Found {len(custom_config)} teleoperator(s): {list(custom_config.keys())}")
# Initialize the base Teleoperator class
super().__init__(config)
# Store multiple base teleoperators and their action mappings
self.base_teleops = {}
self.robot_actions_configs = {}
# Instantiate each base teleoperator from the config
for device_name, device_config in custom_config.items():
base_class_name = device_config["base_class"]
# Create a config copy for this teleoperator
from dataclasses import replace
teleop_config = replace(
config,
port=device_config.get("port", config.port),
id=device_config.get("id", f"{config.id}_{device_name}"),
baud_rate=device_config.get("baud_rate", config.baud_rate)
)
logger.info(f" {device_name}: class={base_class_name}, port={teleop_config.port}, id={teleop_config.id}")
# Dynamically import and instantiate the base teleoperator class
module_path, class_name_full = base_class_name.rsplit(".", 1)
module = importlib.import_module(module_path)
base_class = getattr(module, class_name_full)
# Store the teleoperator and its action mapping
self.base_teleops[device_name] = base_class(teleop_config)
self.robot_actions_configs[device_name] = device_config["robot_actions"]
@property
def action_features(self) -> dict:
# Aggregate action features from all teleoperators' action mappings
all_actions = {}
for device_config in self.robot_actions_configs.values():
for robot_action in device_config.keys():
all_actions[robot_action] = float
return all_actions
@property
def feedback_features(self) -> dict:
# Aggregate feedback features from all base teleoperators
all_feedback = {}
for teleop in self.base_teleops.values():
all_feedback.update(teleop.feedback_features)
return all_feedback
@property
def is_connected(self) -> bool:
# All teleoperators must be connected
return all(teleop.is_connected for teleop in self.base_teleops.values())
@property
def is_calibrated(self) -> bool:
# All teleoperators must be calibrated
return all(teleop.is_calibrated for teleop in self.base_teleops.values())
def connect(self, calibrate: bool = True) -> None:
# Connect all base teleoperators
for device_name, teleop in self.base_teleops.items():
logger.info(f"Connecting {device_name}...")
teleop.connect(calibrate=calibrate)
def calibrate(self) -> None:
# Calibrate all base teleoperators
for device_name, teleop in self.base_teleops.items():
logger.info(f"Calibrating {device_name}...")
teleop.calibrate()
def configure(self) -> None:
# Configure all base teleoperators
for teleop in self.base_teleops.values():
teleop.configure()
def send_feedback(self, feedback: dict[str, float]) -> None:
# Send feedback to all base teleoperators
for teleop in self.base_teleops.values():
teleop.send_feedback(feedback)
def disconnect(self) -> None:
# Disconnect all base teleoperators
for device_name, teleop in self.base_teleops.items():
logger.info(f"Disconnecting {device_name}...")
teleop.disconnect()
def _normalize_to_unit_range(self, teleop, joint_name: str, value: float) -> float:
"""Convert a joint value from base teleoperator's normalization mode to [0, 1] range.
Args:
teleop: The base teleoperator instance
joint_name: Name of the joint (e.g., "shoulder_pitch")
value: Value in the base teleoperator's normalization mode
Returns:
Value normalized to [0, 1] range
"""
norm_mode = teleop.joints[joint_name]
if norm_mode == MotorNormMode.RANGE_M100_100:
# Convert from [-100, 100] to [0, 1]
return (value + 100.0) / 200.0
elif norm_mode == MotorNormMode.RANGE_0_100:
# Convert from [0, 100] to [0, 1]
return value / 100.0
elif norm_mode == MotorNormMode.DEGREES:
# For degrees, we need calibration to know the range
# Use calibration min/max to normalize
if teleop.calibration and joint_name in teleop.calibration:
min_deg = teleop.calibration[joint_name].range_min
max_deg = teleop.calibration[joint_name].range_max
if max_deg != min_deg:
return (value - min_deg) / (max_deg - min_deg)
# Fallback: assume common range like [-180, 180]
return (value + 180.0) / 360.0
else:
raise ValueError(f"Unknown normalization mode: {norm_mode}")
def get_action(self) -> dict[str, float]:
# Build action dict by reading from all base teleoperators
action = {}
# Loop through each teleoperator
for device_name, teleop in self.base_teleops.items():
# Read joint positions from this teleoperator
# These are in the teleoperator's normalization mode (e.g., -100 to 100)
joint_positions = teleop._read()
# Get the robot actions config for this teleoperator
robot_actions_config = self.robot_actions_configs[device_name]
# Process each robot action for this teleoperator
for robot_action, config in robot_actions_config.items():
if config["source"] == "neutral":
# Use fixed neutral value (already in [0, 1] range)
value = config["value"]
elif config["source"] == "teleop":
# Get value from teleop joint
teleop_joint = config["joint"]
value = joint_positions[teleop_joint]
# Convert from base teleoperator's normalization mode to [0, 1] range
value = self._normalize_to_unit_range(teleop, teleop_joint, value)
# Apply inversion if specified
if config.get("invert", False):
value = 1.0 - value
else:
raise ValueError(f"Unknown source '{config['source']}' for robot action '{robot_action}'")
action[robot_action] = value
return action
@@ -0,0 +1,76 @@
{
"right_arm": {
"base_class": "lerobot.teleoperators.homunculus.homunculus_arm.HomunculusArm",
"port": "/dev/ttyACM0",
"id": "unitree_right",
"baud_rate": 115200,
"robot_actions": {
"kRightShoulderPitch.pos": {
"source": "neutral",
"value": 0.5
},
"kRightShoulderRoll.pos": {
"source": "neutral",
"value": 0.5
},
"kRightShoulderYaw.pos": {
"source": "neutral",
"value": 0.5
},
"kRightElbow.pos": {
"source": "neutral",
"value": 0.5
},
"kRightWristRoll.pos": {
"source": "teleop",
"joint": "wrist_roll",
"invert": true
},
"kRightWristPitch.pos": {
"source": "neutral",
"value": 0.5
},
"kRightWristYaw.pos": {
"source": "neutral",
"value": 0.5
}
}
},
"left_arm": {
"base_class": "lerobot.teleoperators.homunculus.homunculus_arm.HomunculusArm",
"port": "/dev/ttyACM1",
"id": "unitree_left",
"baud_rate": 115200,
"robot_actions": {
"kLeftShoulderPitch.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftShoulderRoll.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftShoulderYaw.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftElbow.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftWristRoll.pos": {
"source": "teleop",
"joint": "wrist_roll",
"invert": true
},
"kLeftWristPitch.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftWristyaw.pos": {
"source": "neutral",
"value": 0.5
}
}
}
}