Compare commits

...

1 Commits

Author SHA1 Message Date
CarolinePascal ee50b0f24b feat(WIP): adding support for multi-process camera video capture 2025-11-04 19:51:58 +01:00
+38 -30
View File
@@ -23,6 +23,8 @@ import platform
import time import time
from pathlib import Path from pathlib import Path
from threading import Event, Lock, Thread from threading import Event, Lock, Thread
from multiprocessing import Process, Event as EventProcess, JoinableQueue as Queue
from queue import Empty
from typing import Any from typing import Any
from numpy.typing import NDArray # type: ignore # TODO: add type stubs for numpy.typing from numpy.typing import NDArray # type: ignore # TODO: add type stubs for numpy.typing
@@ -119,11 +121,10 @@ class OpenCVCamera(Camera):
self.videocapture: cv2.VideoCapture | None = None self.videocapture: cv2.VideoCapture | None = None
self.thread: Thread | None = None self.process: Process | None = None
self.stop_event: Event | None = None self.stop_event: EventProcess | None = None
self.frame_lock: Lock = Lock() self.frame_queue: Queue = Queue()
self.latest_frame: NDArray[Any] | None = None self.latest_frame: NDArray[Any] | None = None
self.new_frame_event: Event = Event()
self.rotation: int | None = get_cv2_rotation(config.rotation) self.rotation: int | None = get_cv2_rotation(config.rotation)
self.backend: int = get_cv2_backend() self.backend: int = get_cv2_backend()
@@ -442,37 +443,36 @@ class OpenCVCamera(Camera):
while not self.stop_event.is_set(): while not self.stop_event.is_set():
try: try:
color_image = self.read() color_image = self.read()
self.frame_queue.put_nowait(color_image)
with self.frame_lock:
self.latest_frame = color_image
self.new_frame_event.set()
except DeviceNotConnectedError: except DeviceNotConnectedError:
break break
except Exception as e: except Exception as e:
logger.warning(f"Error reading frame in background thread for {self}: {e}") logger.warning(f"Error reading frame in background thread for {self}: {e}")
def _start_read_thread(self) -> None: def _start_read_process(self) -> None:
"""Starts or restarts the background read thread if it's not running.""" """Starts or restarts the background read thread if it's not running."""
if self.thread is not None and self.thread.is_alive(): if self.process is not None and self.process.is_alive():
self.thread.join(timeout=0.1) self.frame_queue.join()
self.process.join()
if self.stop_event is not None: if self.stop_event is not None:
self.stop_event.set() self.stop_event.set()
self.stop_event = Event() self.stop_event = Event()
self.thread = Thread(target=self._read_loop, args=(), name=f"{self}_read_loop") self.process = Process(target=self._read_loop, args=(), name=f"{self}_read_loop")
self.thread.daemon = True self.process.daemon = True
self.thread.start() self.process.start()
def _stop_read_thread(self) -> None: def _stop_read_thread(self) -> None:
"""Signals the background read thread to stop and waits for it to join.""" """Signals the background read thread to stop and waits for it to join."""
if self.stop_event is not None: if self.stop_event is not None:
self.stop_event.set() self.stop_event.set()
if self.thread is not None and self.thread.is_alive(): if self.process is not None and self.process.is_alive():
self.thread.join(timeout=2.0) self.frame_queue.join()
self.process.join()
self.thread = None self.process = None
self.stop_event = None self.stop_event = None
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]: def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
@@ -499,24 +499,32 @@ class OpenCVCamera(Camera):
if not self.is_connected: if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.") raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive(): if self.process is None or not self.process.is_alive():
self._start_read_thread() self._start_read_process()
if not self.new_frame_event.wait(timeout=timeout_ms / 1000.0): if self.latest_frame is None:
thread_alive = self.thread is not None and self.thread.is_alive() self.latest_frame = self.frame_queue.get()
raise TimeoutError( self.frame_queue.task_done()
f"Timed out waiting for frame from camera {self} after {timeout_ms} ms. " return self.latest_frame
f"Read thread alive: {thread_alive}."
)
with self.frame_lock: try:
frame = self.latest_frame frame = self.frame_queue.get(timeout=timeout_ms / 1000.0)
self.new_frame_event.clear() self.frame_queue.task_done()
except Empty:
process_alive = self.process is not None and self.process.is_alive()
if process_alive:
logger.warning(f"{self} async_read timed out after {timeout_ms} ms but camera is still running.")
return self.latest_frame
else:
raise TimeoutError(
f"{self} async_read timed out after {timeout_ms} ms: camera is not responding !"
)
if frame is None: if frame is None:
raise RuntimeError(f"Internal error: Event set but no frame available for {self}.") raise RuntimeError(f"Internal error: Event set but no frame available for {self}.")
else:
return frame self.latest_frame = frame
return self.latest_frame
def disconnect(self) -> None: def disconnect(self) -> None:
""" """