Compare commits

..

1 Commits

Author SHA1 Message Date
CarolinePascal ee50b0f24b feat(WIP): adding support for multi-process camera video capture 2025-11-04 19:51:58 +01:00
2 changed files with 55 additions and 56 deletions
+37 -29
View File
@@ -23,6 +23,8 @@ import platform
import time import time
from pathlib import Path from pathlib import Path
from threading import Event, Lock, Thread from threading import Event, Lock, Thread
from multiprocessing import Process, Event as EventProcess, JoinableQueue as Queue
from queue import Empty
from typing import Any from typing import Any
from numpy.typing import NDArray # type: ignore # TODO: add type stubs for numpy.typing from numpy.typing import NDArray # type: ignore # TODO: add type stubs for numpy.typing
@@ -119,11 +121,10 @@ class OpenCVCamera(Camera):
self.videocapture: cv2.VideoCapture | None = None self.videocapture: cv2.VideoCapture | None = None
self.thread: Thread | None = None self.process: Process | None = None
self.stop_event: Event | None = None self.stop_event: EventProcess | None = None
self.frame_lock: Lock = Lock() self.frame_queue: Queue = Queue()
self.latest_frame: NDArray[Any] | None = None self.latest_frame: NDArray[Any] | None = None
self.new_frame_event: Event = Event()
self.rotation: int | None = get_cv2_rotation(config.rotation) self.rotation: int | None = get_cv2_rotation(config.rotation)
self.backend: int = get_cv2_backend() self.backend: int = get_cv2_backend()
@@ -442,37 +443,36 @@ class OpenCVCamera(Camera):
while not self.stop_event.is_set(): while not self.stop_event.is_set():
try: try:
color_image = self.read() color_image = self.read()
self.frame_queue.put_nowait(color_image)
with self.frame_lock:
self.latest_frame = color_image
self.new_frame_event.set()
except DeviceNotConnectedError: except DeviceNotConnectedError:
break break
except Exception as e: except Exception as e:
logger.warning(f"Error reading frame in background thread for {self}: {e}") logger.warning(f"Error reading frame in background thread for {self}: {e}")
def _start_read_thread(self) -> None: def _start_read_process(self) -> None:
"""Starts or restarts the background read thread if it's not running.""" """Starts or restarts the background read thread if it's not running."""
if self.thread is not None and self.thread.is_alive(): if self.process is not None and self.process.is_alive():
self.thread.join(timeout=0.1) self.frame_queue.join()
self.process.join()
if self.stop_event is not None: if self.stop_event is not None:
self.stop_event.set() self.stop_event.set()
self.stop_event = Event() self.stop_event = Event()
self.thread = Thread(target=self._read_loop, args=(), name=f"{self}_read_loop") self.process = Process(target=self._read_loop, args=(), name=f"{self}_read_loop")
self.thread.daemon = True self.process.daemon = True
self.thread.start() self.process.start()
def _stop_read_thread(self) -> None: def _stop_read_thread(self) -> None:
"""Signals the background read thread to stop and waits for it to join.""" """Signals the background read thread to stop and waits for it to join."""
if self.stop_event is not None: if self.stop_event is not None:
self.stop_event.set() self.stop_event.set()
if self.thread is not None and self.thread.is_alive(): if self.process is not None and self.process.is_alive():
self.thread.join(timeout=2.0) self.frame_queue.join()
self.process.join()
self.thread = None self.process = None
self.stop_event = None self.stop_event = None
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]: def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
@@ -499,24 +499,32 @@ class OpenCVCamera(Camera):
if not self.is_connected: if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.") raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive(): if self.process is None or not self.process.is_alive():
self._start_read_thread() self._start_read_process()
if not self.new_frame_event.wait(timeout=timeout_ms / 1000.0): if self.latest_frame is None:
thread_alive = self.thread is not None and self.thread.is_alive() self.latest_frame = self.frame_queue.get()
self.frame_queue.task_done()
return self.latest_frame
try:
frame = self.frame_queue.get(timeout=timeout_ms / 1000.0)
self.frame_queue.task_done()
except Empty:
process_alive = self.process is not None and self.process.is_alive()
if process_alive:
logger.warning(f"{self} async_read timed out after {timeout_ms} ms but camera is still running.")
return self.latest_frame
else:
raise TimeoutError( raise TimeoutError(
f"Timed out waiting for frame from camera {self} after {timeout_ms} ms. " f"{self} async_read timed out after {timeout_ms} ms: camera is not responding !"
f"Read thread alive: {thread_alive}."
) )
with self.frame_lock:
frame = self.latest_frame
self.new_frame_event.clear()
if frame is None: if frame is None:
raise RuntimeError(f"Internal error: Event set but no frame available for {self}.") raise RuntimeError(f"Internal error: Event set but no frame available for {self}.")
else:
return frame self.latest_frame = frame
return self.latest_frame
def disconnect(self) -> None: def disconnect(self) -> None:
""" """
@@ -50,9 +50,9 @@ from typing import Any
import jsonlines import jsonlines
import pandas as pd import pandas as pd
import pyarrow.parquet as pq import pyarrow as pa
import tqdm import tqdm
from datasets import Dataset, concatenate_datasets from datasets import Dataset, Features, Image
from huggingface_hub import HfApi, snapshot_download from huggingface_hub import HfApi, snapshot_download
from requests import HTTPError from requests import HTTPError
@@ -68,7 +68,6 @@ from lerobot.datasets.utils import (
LEGACY_EPISODES_STATS_PATH, LEGACY_EPISODES_STATS_PATH,
LEGACY_TASKS_PATH, LEGACY_TASKS_PATH,
cast_stats_to_numpy, cast_stats_to_numpy,
embed_images,
flatten_dict, flatten_dict,
get_file_size_in_mb, get_file_size_in_mb,
get_parquet_file_size_in_mb, get_parquet_file_size_in_mb,
@@ -175,33 +174,25 @@ def convert_tasks(root, new_root):
write_tasks(df_tasks, new_root) write_tasks(df_tasks, new_root)
def concat_data_files( def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys):
paths_to_cat: list[Path], new_root: Path, chunk_idx: int, file_idx: int, image_keys: list[str] # TODO(rcadene): to save RAM use Dataset.from_parquet(file) and concatenate_datasets
): dataframes = [pd.read_parquet(file) for file in paths_to_cat]
"""Concatenate multiple parquet data files into a single file. # Concatenate all DataFrames along rows
concatenated_df = pd.concat(dataframes, ignore_index=True)
Args:
paths_to_cat: List of parquet file paths to concatenate
new_root: Root directory for the new dataset
chunk_idx: Chunk index for the output file
file_idx: File index within the chunk
image_keys: List of feature keys that contain images
"""
datasets_list: list[Dataset] = [Dataset.from_parquet(str(file)) for file in paths_to_cat]
concatenated_ds: Dataset = concatenate_datasets(datasets_list)
if len(image_keys) > 0:
logging.debug(f"Embedding {len(image_keys)} image features for optimal training performance")
concatenated_ds = embed_images(concatenated_ds)
path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx) path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
path.parent.mkdir(parents=True, exist_ok=True) path.parent.mkdir(parents=True, exist_ok=True)
table = concatenated_ds.with_format("arrow")[:] if len(image_keys) > 0:
writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True) schema = pa.Schema.from_pandas(concatenated_df)
writer.write_table(table) features = Features.from_arrow_schema(schema)
writer.close() for key in image_keys:
features[key] = Image()
schema = features.arrow_schema
else:
schema = None
concatenated_df.to_parquet(path, index=False, schema=schema)
def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int): def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int):