Compare commits

..

1 Commits

Author SHA1 Message Date
CarolinePascal ee50b0f24b feat(WIP): adding support for multi-process camera video capture 2025-11-04 19:51:58 +01:00
2 changed files with 55 additions and 56 deletions
+38 -30
View File
@@ -23,6 +23,8 @@ import platform
import time
from pathlib import Path
from threading import Event, Lock, Thread
from multiprocessing import Process, Event as EventProcess, JoinableQueue as Queue
from queue import Empty
from typing import Any
from numpy.typing import NDArray # type: ignore # TODO: add type stubs for numpy.typing
@@ -119,11 +121,10 @@ class OpenCVCamera(Camera):
self.videocapture: cv2.VideoCapture | None = None
self.thread: Thread | None = None
self.stop_event: Event | None = None
self.frame_lock: Lock = Lock()
self.process: Process | None = None
self.stop_event: EventProcess | None = None
self.frame_queue: Queue = Queue()
self.latest_frame: NDArray[Any] | None = None
self.new_frame_event: Event = Event()
self.rotation: int | None = get_cv2_rotation(config.rotation)
self.backend: int = get_cv2_backend()
@@ -442,37 +443,36 @@ class OpenCVCamera(Camera):
while not self.stop_event.is_set():
try:
color_image = self.read()
with self.frame_lock:
self.latest_frame = color_image
self.new_frame_event.set()
self.frame_queue.put_nowait(color_image)
except DeviceNotConnectedError:
break
except Exception as e:
logger.warning(f"Error reading frame in background thread for {self}: {e}")
def _start_read_thread(self) -> None:
def _start_read_process(self) -> None:
"""Starts or restarts the background read thread if it's not running."""
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=0.1)
if self.process is not None and self.process.is_alive():
self.frame_queue.join()
self.process.join()
if self.stop_event is not None:
self.stop_event.set()
self.stop_event = Event()
self.thread = Thread(target=self._read_loop, args=(), name=f"{self}_read_loop")
self.thread.daemon = True
self.thread.start()
self.process = Process(target=self._read_loop, args=(), name=f"{self}_read_loop")
self.process.daemon = True
self.process.start()
def _stop_read_thread(self) -> None:
"""Signals the background read thread to stop and waits for it to join."""
if self.stop_event is not None:
self.stop_event.set()
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=2.0)
if self.process is not None and self.process.is_alive():
self.frame_queue.join()
self.process.join()
self.thread = None
self.process = None
self.stop_event = None
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
@@ -499,24 +499,32 @@ class OpenCVCamera(Camera):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
self._start_read_thread()
if self.process is None or not self.process.is_alive():
self._start_read_process()
if not self.new_frame_event.wait(timeout=timeout_ms / 1000.0):
thread_alive = self.thread is not None and self.thread.is_alive()
raise TimeoutError(
f"Timed out waiting for frame from camera {self} after {timeout_ms} ms. "
f"Read thread alive: {thread_alive}."
)
if self.latest_frame is None:
self.latest_frame = self.frame_queue.get()
self.frame_queue.task_done()
return self.latest_frame
with self.frame_lock:
frame = self.latest_frame
self.new_frame_event.clear()
try:
frame = self.frame_queue.get(timeout=timeout_ms / 1000.0)
self.frame_queue.task_done()
except Empty:
process_alive = self.process is not None and self.process.is_alive()
if process_alive:
logger.warning(f"{self} async_read timed out after {timeout_ms} ms but camera is still running.")
return self.latest_frame
else:
raise TimeoutError(
f"{self} async_read timed out after {timeout_ms} ms: camera is not responding !"
)
if frame is None:
raise RuntimeError(f"Internal error: Event set but no frame available for {self}.")
return frame
else:
self.latest_frame = frame
return self.latest_frame
def disconnect(self) -> None:
"""
@@ -50,9 +50,9 @@ from typing import Any
import jsonlines
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
import tqdm
from datasets import Dataset, concatenate_datasets
from datasets import Dataset, Features, Image
from huggingface_hub import HfApi, snapshot_download
from requests import HTTPError
@@ -68,7 +68,6 @@ from lerobot.datasets.utils import (
LEGACY_EPISODES_STATS_PATH,
LEGACY_TASKS_PATH,
cast_stats_to_numpy,
embed_images,
flatten_dict,
get_file_size_in_mb,
get_parquet_file_size_in_mb,
@@ -175,33 +174,25 @@ def convert_tasks(root, new_root):
write_tasks(df_tasks, new_root)
def concat_data_files(
paths_to_cat: list[Path], new_root: Path, chunk_idx: int, file_idx: int, image_keys: list[str]
):
"""Concatenate multiple parquet data files into a single file.
Args:
paths_to_cat: List of parquet file paths to concatenate
new_root: Root directory for the new dataset
chunk_idx: Chunk index for the output file
file_idx: File index within the chunk
image_keys: List of feature keys that contain images
"""
datasets_list: list[Dataset] = [Dataset.from_parquet(str(file)) for file in paths_to_cat]
concatenated_ds: Dataset = concatenate_datasets(datasets_list)
if len(image_keys) > 0:
logging.debug(f"Embedding {len(image_keys)} image features for optimal training performance")
concatenated_ds = embed_images(concatenated_ds)
def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys):
# TODO(rcadene): to save RAM use Dataset.from_parquet(file) and concatenate_datasets
dataframes = [pd.read_parquet(file) for file in paths_to_cat]
# Concatenate all DataFrames along rows
concatenated_df = pd.concat(dataframes, ignore_index=True)
path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
path.parent.mkdir(parents=True, exist_ok=True)
table = concatenated_ds.with_format("arrow")[:]
writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True)
writer.write_table(table)
writer.close()
if len(image_keys) > 0:
schema = pa.Schema.from_pandas(concatenated_df)
features = Features.from_arrow_schema(schema)
for key in image_keys:
features[key] = Image()
schema = features.arrow_schema
else:
schema = None
concatenated_df.to_parquet(path, index=False, schema=schema)
def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int):