From ee50b0f24b535b718b44dfc5783ee309301827a3 Mon Sep 17 00:00:00 2001 From: CarolinePascal Date: Tue, 4 Nov 2025 19:51:58 +0100 Subject: [PATCH] feat(WIP): adding support for multi-process camera video capture --- src/lerobot/cameras/opencv/camera_opencv.py | 68 ++++++++++++--------- 1 file changed, 38 insertions(+), 30 deletions(-) diff --git a/src/lerobot/cameras/opencv/camera_opencv.py b/src/lerobot/cameras/opencv/camera_opencv.py index b1043ba64..f08ad8096 100644 --- a/src/lerobot/cameras/opencv/camera_opencv.py +++ b/src/lerobot/cameras/opencv/camera_opencv.py @@ -23,6 +23,8 @@ import platform import time from pathlib import Path from threading import Event, Lock, Thread +from multiprocessing import Process, Event as EventProcess, JoinableQueue as Queue +from queue import Empty from typing import Any from numpy.typing import NDArray # type: ignore # TODO: add type stubs for numpy.typing @@ -119,11 +121,10 @@ class OpenCVCamera(Camera): self.videocapture: cv2.VideoCapture | None = None - self.thread: Thread | None = None - self.stop_event: Event | None = None - self.frame_lock: Lock = Lock() + self.process: Process | None = None + self.stop_event: EventProcess | None = None + self.frame_queue: Queue = Queue() self.latest_frame: NDArray[Any] | None = None - self.new_frame_event: Event = Event() self.rotation: int | None = get_cv2_rotation(config.rotation) self.backend: int = get_cv2_backend() @@ -442,37 +443,36 @@ class OpenCVCamera(Camera): while not self.stop_event.is_set(): try: color_image = self.read() - - with self.frame_lock: - self.latest_frame = color_image - self.new_frame_event.set() + self.frame_queue.put_nowait(color_image) except DeviceNotConnectedError: break except Exception as e: logger.warning(f"Error reading frame in background thread for {self}: {e}") - def _start_read_thread(self) -> None: + def _start_read_process(self) -> None: """Starts or restarts the background read thread if it's not running.""" - if self.thread is not None and self.thread.is_alive(): - self.thread.join(timeout=0.1) + if self.process is not None and self.process.is_alive(): + self.frame_queue.join() + self.process.join() if self.stop_event is not None: self.stop_event.set() self.stop_event = Event() - self.thread = Thread(target=self._read_loop, args=(), name=f"{self}_read_loop") - self.thread.daemon = True - self.thread.start() + self.process = Process(target=self._read_loop, args=(), name=f"{self}_read_loop") + self.process.daemon = True + self.process.start() def _stop_read_thread(self) -> None: """Signals the background read thread to stop and waits for it to join.""" if self.stop_event is not None: self.stop_event.set() - if self.thread is not None and self.thread.is_alive(): - self.thread.join(timeout=2.0) + if self.process is not None and self.process.is_alive(): + self.frame_queue.join() + self.process.join() - self.thread = None + self.process = None self.stop_event = None def async_read(self, timeout_ms: float = 200) -> NDArray[Any]: @@ -499,24 +499,32 @@ class OpenCVCamera(Camera): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - if self.thread is None or not self.thread.is_alive(): - self._start_read_thread() + if self.process is None or not self.process.is_alive(): + self._start_read_process() - if not self.new_frame_event.wait(timeout=timeout_ms / 1000.0): - thread_alive = self.thread is not None and self.thread.is_alive() - raise TimeoutError( - f"Timed out waiting for frame from camera {self} after {timeout_ms} ms. " - f"Read thread alive: {thread_alive}." - ) + if self.latest_frame is None: + self.latest_frame = self.frame_queue.get() + self.frame_queue.task_done() + return self.latest_frame - with self.frame_lock: - frame = self.latest_frame - self.new_frame_event.clear() + try: + frame = self.frame_queue.get(timeout=timeout_ms / 1000.0) + self.frame_queue.task_done() + except Empty: + process_alive = self.process is not None and self.process.is_alive() + if process_alive: + logger.warning(f"{self} async_read timed out after {timeout_ms} ms but camera is still running.") + return self.latest_frame + else: + raise TimeoutError( + f"{self} async_read timed out after {timeout_ms} ms: camera is not responding !" + ) if frame is None: raise RuntimeError(f"Internal error: Event set but no frame available for {self}.") - - return frame + else: + self.latest_frame = frame + return self.latest_frame def disconnect(self) -> None: """