Compare commits

..

2 Commits

Author SHA1 Message Date
CarolinePascal ee50b0f24b feat(WIP): adding support for multi-process camera video capture 2025-11-04 19:51:58 +01:00
Michel Aractingi f6b16f6d97 fix(dataset_tools) Critical bug in modify features (#2342)
* fix bug in `_copy_data_with_feature_changes`

* Update src/lerobot/datasets/dataset_tools.py

Co-authored-by: Caroline Pascal <caroline8.pascal@gmail.com>
Signed-off-by: Michel Aractingi <michel.aractingi@huggingface.co>

* add missing import

---------

Signed-off-by: Michel Aractingi <michel.aractingi@huggingface.co>
Co-authored-by: Caroline Pascal <caroline8.pascal@gmail.com>
2025-11-04 15:56:41 +01:00
6 changed files with 52 additions and 380 deletions
+38 -30
View File
@@ -23,6 +23,8 @@ import platform
import time
from pathlib import Path
from threading import Event, Lock, Thread
from multiprocessing import Process, Event as EventProcess, JoinableQueue as Queue
from queue import Empty
from typing import Any
from numpy.typing import NDArray # type: ignore # TODO: add type stubs for numpy.typing
@@ -119,11 +121,10 @@ class OpenCVCamera(Camera):
self.videocapture: cv2.VideoCapture | None = None
self.thread: Thread | None = None
self.stop_event: Event | None = None
self.frame_lock: Lock = Lock()
self.process: Process | None = None
self.stop_event: EventProcess | None = None
self.frame_queue: Queue = Queue()
self.latest_frame: NDArray[Any] | None = None
self.new_frame_event: Event = Event()
self.rotation: int | None = get_cv2_rotation(config.rotation)
self.backend: int = get_cv2_backend()
@@ -442,37 +443,36 @@ class OpenCVCamera(Camera):
while not self.stop_event.is_set():
try:
color_image = self.read()
with self.frame_lock:
self.latest_frame = color_image
self.new_frame_event.set()
self.frame_queue.put_nowait(color_image)
except DeviceNotConnectedError:
break
except Exception as e:
logger.warning(f"Error reading frame in background thread for {self}: {e}")
def _start_read_thread(self) -> None:
def _start_read_process(self) -> None:
"""Starts or restarts the background read thread if it's not running."""
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=0.1)
if self.process is not None and self.process.is_alive():
self.frame_queue.join()
self.process.join()
if self.stop_event is not None:
self.stop_event.set()
self.stop_event = Event()
self.thread = Thread(target=self._read_loop, args=(), name=f"{self}_read_loop")
self.thread.daemon = True
self.thread.start()
self.process = Process(target=self._read_loop, args=(), name=f"{self}_read_loop")
self.process.daemon = True
self.process.start()
def _stop_read_thread(self) -> None:
"""Signals the background read thread to stop and waits for it to join."""
if self.stop_event is not None:
self.stop_event.set()
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=2.0)
if self.process is not None and self.process.is_alive():
self.frame_queue.join()
self.process.join()
self.thread = None
self.process = None
self.stop_event = None
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
@@ -499,24 +499,32 @@ class OpenCVCamera(Camera):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
self._start_read_thread()
if self.process is None or not self.process.is_alive():
self._start_read_process()
if not self.new_frame_event.wait(timeout=timeout_ms / 1000.0):
thread_alive = self.thread is not None and self.thread.is_alive()
raise TimeoutError(
f"Timed out waiting for frame from camera {self} after {timeout_ms} ms. "
f"Read thread alive: {thread_alive}."
)
if self.latest_frame is None:
self.latest_frame = self.frame_queue.get()
self.frame_queue.task_done()
return self.latest_frame
with self.frame_lock:
frame = self.latest_frame
self.new_frame_event.clear()
try:
frame = self.frame_queue.get(timeout=timeout_ms / 1000.0)
self.frame_queue.task_done()
except Empty:
process_alive = self.process is not None and self.process.is_alive()
if process_alive:
logger.warning(f"{self} async_read timed out after {timeout_ms} ms but camera is still running.")
return self.latest_frame
else:
raise TimeoutError(
f"{self} async_read timed out after {timeout_ms} ms: camera is not responding !"
)
if frame is None:
raise RuntimeError(f"Internal error: Event set but no frame available for {self}.")
return frame
else:
self.latest_frame = frame
return self.latest_frame
def disconnect(self) -> None:
"""
+14 -18
View File
@@ -39,6 +39,7 @@ from lerobot.datasets.aggregate import aggregate_datasets
from lerobot.datasets.compute_stats import aggregate_stats
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.datasets.utils import (
DATA_DIR,
DEFAULT_CHUNK_SIZE,
DEFAULT_DATA_FILE_SIZE_IN_MB,
DEFAULT_DATA_PATH,
@@ -962,28 +963,23 @@ def _copy_data_with_feature_changes(
remove_features: list[str] | None = None,
) -> None:
"""Copy data while adding or removing features."""
if dataset.meta.episodes is None:
dataset.meta.episodes = load_episodes(dataset.meta.root)
data_dir = dataset.root / DATA_DIR
parquet_files = sorted(data_dir.glob("*/*.parquet"))
# Map file paths to episode indices to extract chunk/file indices
file_to_episodes: dict[Path, set[int]] = {}
for ep_idx in range(dataset.meta.total_episodes):
file_path = dataset.meta.get_data_file_path(ep_idx)
if file_path not in file_to_episodes:
file_to_episodes[file_path] = set()
file_to_episodes[file_path].add(ep_idx)
if not parquet_files:
raise ValueError(f"No parquet files found in {data_dir}")
frame_idx = 0
for src_path in tqdm(sorted(file_to_episodes.keys()), desc="Processing data files"):
df = pd.read_parquet(dataset.root / src_path).reset_index(drop=True)
for src_path in tqdm(parquet_files, desc="Processing data files"):
df = pd.read_parquet(src_path).reset_index(drop=True)
# Get chunk_idx and file_idx from the source file's first episode
episodes_in_file = file_to_episodes[src_path]
first_ep_idx = min(episodes_in_file)
src_ep = dataset.meta.episodes[first_ep_idx]
chunk_idx = src_ep["data/chunk_index"]
file_idx = src_ep["data/file_index"]
relative_path = src_path.relative_to(dataset.root)
chunk_dir = relative_path.parts[1]
file_name = relative_path.parts[2]
chunk_idx = int(chunk_dir.split("-")[1])
file_idx = int(file_name.split("-")[1].split(".")[0])
if remove_features:
df = df.drop(columns=remove_features, errors="ignore")
@@ -1009,7 +1005,7 @@ def _copy_data_with_feature_changes(
df[feature_name] = feature_slice
frame_idx = end_idx
# Write using the preserved chunk_idx and file_idx from source
# Write using the same chunk/file structure as source
dst_path = new_meta.root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
dst_path.parent.mkdir(parents=True, exist_ok=True)
@@ -1,18 +0,0 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .config_custom import CustomConfig
from .custom import Custom
@@ -1,32 +0,0 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("custom")
@dataclass
class CustomConfig(TeleoperatorConfig):
"""Custom teleoperator config that dynamically wraps a base teleoperator class.
The base class and its configuration are loaded from a JSON config file at runtime.
Port and baud_rate are taken from the first device in the config file.
"""
config_path: str | None = None # REQUIRED: Path to custom config JSON file
port: str = "/dev/ttyACM0" # Default port
baud_rate: int = 115200 # Default baud rate
-206
View File
@@ -1,206 +0,0 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import json
import logging
from pathlib import Path
from lerobot.motors.motors_bus import MotorNormMode
from ..teleoperator import Teleoperator
from .config_custom import CustomConfig
logger = logging.getLogger(__name__)
class Custom(Teleoperator):
"""
Custom teleoperator that dynamically wraps a base teleoperator class and applies configurable joint mapping.
The base class is specified in custom_config.json, allowing flexible teleoperator configurations.
"""
config_class = CustomConfig
name = "custom"
def __init__(self, config: CustomConfig):
# Load custom configuration from JSON file
if config.config_path is None:
raise ValueError(
"config_path must be provided for custom teleoperator. "
"Example: --teleop.config_path=/path/to/custom_config.json"
)
config_path = Path(config.config_path)
with open(config_path) as f:
custom_config = json.load(f)
logger.info(f"Loaded custom config from {config_path}")
logger.info(f"Found {len(custom_config)} teleoperator(s): {list(custom_config.keys())}")
# Initialize the base Teleoperator class
super().__init__(config)
# Store multiple base teleoperators and their action mappings
self.base_teleops = {}
self.robot_actions_configs = {}
# Instantiate each base teleoperator from the config
for device_name, device_config in custom_config.items():
base_class_name = device_config["base_class"]
# Create a config copy for this teleoperator
from dataclasses import replace
teleop_config = replace(
config,
port=device_config.get("port", config.port),
id=device_config.get("id", f"{config.id}_{device_name}"),
baud_rate=device_config.get("baud_rate", config.baud_rate)
)
logger.info(f" {device_name}: class={base_class_name}, port={teleop_config.port}, id={teleop_config.id}")
# Dynamically import and instantiate the base teleoperator class
module_path, class_name_full = base_class_name.rsplit(".", 1)
module = importlib.import_module(module_path)
base_class = getattr(module, class_name_full)
# Store the teleoperator and its action mapping
self.base_teleops[device_name] = base_class(teleop_config)
self.robot_actions_configs[device_name] = device_config["robot_actions"]
@property
def action_features(self) -> dict:
# Aggregate action features from all teleoperators' action mappings
all_actions = {}
for device_config in self.robot_actions_configs.values():
for robot_action in device_config.keys():
all_actions[robot_action] = float
return all_actions
@property
def feedback_features(self) -> dict:
# Aggregate feedback features from all base teleoperators
all_feedback = {}
for teleop in self.base_teleops.values():
all_feedback.update(teleop.feedback_features)
return all_feedback
@property
def is_connected(self) -> bool:
# All teleoperators must be connected
return all(teleop.is_connected for teleop in self.base_teleops.values())
@property
def is_calibrated(self) -> bool:
# All teleoperators must be calibrated
return all(teleop.is_calibrated for teleop in self.base_teleops.values())
def connect(self, calibrate: bool = True) -> None:
# Connect all base teleoperators
for device_name, teleop in self.base_teleops.items():
logger.info(f"Connecting {device_name}...")
teleop.connect(calibrate=calibrate)
def calibrate(self) -> None:
# Calibrate all base teleoperators
for device_name, teleop in self.base_teleops.items():
logger.info(f"Calibrating {device_name}...")
teleop.calibrate()
def configure(self) -> None:
# Configure all base teleoperators
for teleop in self.base_teleops.values():
teleop.configure()
def send_feedback(self, feedback: dict[str, float]) -> None:
# Send feedback to all base teleoperators
for teleop in self.base_teleops.values():
teleop.send_feedback(feedback)
def disconnect(self) -> None:
# Disconnect all base teleoperators
for device_name, teleop in self.base_teleops.items():
logger.info(f"Disconnecting {device_name}...")
teleop.disconnect()
def _normalize_to_unit_range(self, teleop, joint_name: str, value: float) -> float:
"""Convert a joint value from base teleoperator's normalization mode to [0, 1] range.
Args:
teleop: The base teleoperator instance
joint_name: Name of the joint (e.g., "shoulder_pitch")
value: Value in the base teleoperator's normalization mode
Returns:
Value normalized to [0, 1] range
"""
norm_mode = teleop.joints[joint_name]
if norm_mode == MotorNormMode.RANGE_M100_100:
# Convert from [-100, 100] to [0, 1]
return (value + 100.0) / 200.0
elif norm_mode == MotorNormMode.RANGE_0_100:
# Convert from [0, 100] to [0, 1]
return value / 100.0
elif norm_mode == MotorNormMode.DEGREES:
# For degrees, we need calibration to know the range
# Use calibration min/max to normalize
if teleop.calibration and joint_name in teleop.calibration:
min_deg = teleop.calibration[joint_name].range_min
max_deg = teleop.calibration[joint_name].range_max
if max_deg != min_deg:
return (value - min_deg) / (max_deg - min_deg)
# Fallback: assume common range like [-180, 180]
return (value + 180.0) / 360.0
else:
raise ValueError(f"Unknown normalization mode: {norm_mode}")
def get_action(self) -> dict[str, float]:
# Build action dict by reading from all base teleoperators
action = {}
# Loop through each teleoperator
for device_name, teleop in self.base_teleops.items():
# Read joint positions from this teleoperator
# These are in the teleoperator's normalization mode (e.g., -100 to 100)
joint_positions = teleop._read()
# Get the robot actions config for this teleoperator
robot_actions_config = self.robot_actions_configs[device_name]
# Process each robot action for this teleoperator
for robot_action, config in robot_actions_config.items():
if config["source"] == "neutral":
# Use fixed neutral value (already in [0, 1] range)
value = config["value"]
elif config["source"] == "teleop":
# Get value from teleop joint
teleop_joint = config["joint"]
value = joint_positions[teleop_joint]
# Convert from base teleoperator's normalization mode to [0, 1] range
value = self._normalize_to_unit_range(teleop, teleop_joint, value)
# Apply inversion if specified
if config.get("invert", False):
value = 1.0 - value
else:
raise ValueError(f"Unknown source '{config['source']}' for robot action '{robot_action}'")
action[robot_action] = value
return action
@@ -1,76 +0,0 @@
{
"right_arm": {
"base_class": "lerobot.teleoperators.homunculus.homunculus_arm.HomunculusArm",
"port": "/dev/ttyACM0",
"id": "unitree_right",
"baud_rate": 115200,
"robot_actions": {
"kRightShoulderPitch.pos": {
"source": "neutral",
"value": 0.5
},
"kRightShoulderRoll.pos": {
"source": "neutral",
"value": 0.5
},
"kRightShoulderYaw.pos": {
"source": "neutral",
"value": 0.5
},
"kRightElbow.pos": {
"source": "neutral",
"value": 0.5
},
"kRightWristRoll.pos": {
"source": "teleop",
"joint": "wrist_roll",
"invert": true
},
"kRightWristPitch.pos": {
"source": "neutral",
"value": 0.5
},
"kRightWristYaw.pos": {
"source": "neutral",
"value": 0.5
}
}
},
"left_arm": {
"base_class": "lerobot.teleoperators.homunculus.homunculus_arm.HomunculusArm",
"port": "/dev/ttyACM1",
"id": "unitree_left",
"baud_rate": 115200,
"robot_actions": {
"kLeftShoulderPitch.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftShoulderRoll.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftShoulderYaw.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftElbow.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftWristRoll.pos": {
"source": "teleop",
"joint": "wrist_roll",
"invert": true
},
"kLeftWristPitch.pos": {
"source": "neutral",
"value": 0.5
},
"kLeftWristyaw.pos": {
"source": "neutral",
"value": 0.5
}
}
}
}