Compare commits

..

3 Commits

Author SHA1 Message Date
Michel Aractingi 8008cb357d remove bad typing 2025-11-06 09:13:26 +01:00
Michel Aractingi ca5a4a7ae5 add typing hints 2025-11-06 09:12:09 +01:00
Michel Aractingi b5dcd70d2c add embed images in conversion to v3 script; add parquet writer in conversion script 2025-11-05 23:41:38 +01:00
2 changed files with 56 additions and 55 deletions
+29 -37
View File
@@ -23,8 +23,6 @@ import platform
import time import time
from pathlib import Path from pathlib import Path
from threading import Event, Lock, Thread from threading import Event, Lock, Thread
from multiprocessing import Process, Event as EventProcess, JoinableQueue as Queue
from queue import Empty
from typing import Any from typing import Any
from numpy.typing import NDArray # type: ignore # TODO: add type stubs for numpy.typing from numpy.typing import NDArray # type: ignore # TODO: add type stubs for numpy.typing
@@ -121,10 +119,11 @@ class OpenCVCamera(Camera):
self.videocapture: cv2.VideoCapture | None = None self.videocapture: cv2.VideoCapture | None = None
self.process: Process | None = None self.thread: Thread | None = None
self.stop_event: EventProcess | None = None self.stop_event: Event | None = None
self.frame_queue: Queue = Queue() self.frame_lock: Lock = Lock()
self.latest_frame: NDArray[Any] | None = None self.latest_frame: NDArray[Any] | None = None
self.new_frame_event: Event = Event()
self.rotation: int | None = get_cv2_rotation(config.rotation) self.rotation: int | None = get_cv2_rotation(config.rotation)
self.backend: int = get_cv2_backend() self.backend: int = get_cv2_backend()
@@ -443,36 +442,37 @@ class OpenCVCamera(Camera):
while not self.stop_event.is_set(): while not self.stop_event.is_set():
try: try:
color_image = self.read() color_image = self.read()
self.frame_queue.put_nowait(color_image)
with self.frame_lock:
self.latest_frame = color_image
self.new_frame_event.set()
except DeviceNotConnectedError: except DeviceNotConnectedError:
break break
except Exception as e: except Exception as e:
logger.warning(f"Error reading frame in background thread for {self}: {e}") logger.warning(f"Error reading frame in background thread for {self}: {e}")
def _start_read_process(self) -> None: def _start_read_thread(self) -> None:
"""Starts or restarts the background read thread if it's not running.""" """Starts or restarts the background read thread if it's not running."""
if self.process is not None and self.process.is_alive(): if self.thread is not None and self.thread.is_alive():
self.frame_queue.join() self.thread.join(timeout=0.1)
self.process.join()
if self.stop_event is not None: if self.stop_event is not None:
self.stop_event.set() self.stop_event.set()
self.stop_event = Event() self.stop_event = Event()
self.process = Process(target=self._read_loop, args=(), name=f"{self}_read_loop") self.thread = Thread(target=self._read_loop, args=(), name=f"{self}_read_loop")
self.process.daemon = True self.thread.daemon = True
self.process.start() self.thread.start()
def _stop_read_thread(self) -> None: def _stop_read_thread(self) -> None:
"""Signals the background read thread to stop and waits for it to join.""" """Signals the background read thread to stop and waits for it to join."""
if self.stop_event is not None: if self.stop_event is not None:
self.stop_event.set() self.stop_event.set()
if self.process is not None and self.process.is_alive(): if self.thread is not None and self.thread.is_alive():
self.frame_queue.join() self.thread.join(timeout=2.0)
self.process.join()
self.process = None self.thread = None
self.stop_event = None self.stop_event = None
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]: def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
@@ -499,32 +499,24 @@ class OpenCVCamera(Camera):
if not self.is_connected: if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.") raise DeviceNotConnectedError(f"{self} is not connected.")
if self.process is None or not self.process.is_alive(): if self.thread is None or not self.thread.is_alive():
self._start_read_process() self._start_read_thread()
if self.latest_frame is None: if not self.new_frame_event.wait(timeout=timeout_ms / 1000.0):
self.latest_frame = self.frame_queue.get() thread_alive = self.thread is not None and self.thread.is_alive()
self.frame_queue.task_done()
return self.latest_frame
try:
frame = self.frame_queue.get(timeout=timeout_ms / 1000.0)
self.frame_queue.task_done()
except Empty:
process_alive = self.process is not None and self.process.is_alive()
if process_alive:
logger.warning(f"{self} async_read timed out after {timeout_ms} ms but camera is still running.")
return self.latest_frame
else:
raise TimeoutError( raise TimeoutError(
f"{self} async_read timed out after {timeout_ms} ms: camera is not responding !" f"Timed out waiting for frame from camera {self} after {timeout_ms} ms. "
f"Read thread alive: {thread_alive}."
) )
with self.frame_lock:
frame = self.latest_frame
self.new_frame_event.clear()
if frame is None: if frame is None:
raise RuntimeError(f"Internal error: Event set but no frame available for {self}.") raise RuntimeError(f"Internal error: Event set but no frame available for {self}.")
else:
self.latest_frame = frame return frame
return self.latest_frame
def disconnect(self) -> None: def disconnect(self) -> None:
""" """
@@ -50,9 +50,9 @@ from typing import Any
import jsonlines import jsonlines
import pandas as pd import pandas as pd
import pyarrow as pa import pyarrow.parquet as pq
import tqdm import tqdm
from datasets import Dataset, Features, Image from datasets import Dataset, concatenate_datasets
from huggingface_hub import HfApi, snapshot_download from huggingface_hub import HfApi, snapshot_download
from requests import HTTPError from requests import HTTPError
@@ -68,6 +68,7 @@ from lerobot.datasets.utils import (
LEGACY_EPISODES_STATS_PATH, LEGACY_EPISODES_STATS_PATH,
LEGACY_TASKS_PATH, LEGACY_TASKS_PATH,
cast_stats_to_numpy, cast_stats_to_numpy,
embed_images,
flatten_dict, flatten_dict,
get_file_size_in_mb, get_file_size_in_mb,
get_parquet_file_size_in_mb, get_parquet_file_size_in_mb,
@@ -174,25 +175,33 @@ def convert_tasks(root, new_root):
write_tasks(df_tasks, new_root) write_tasks(df_tasks, new_root)
def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys): def concat_data_files(
# TODO(rcadene): to save RAM use Dataset.from_parquet(file) and concatenate_datasets paths_to_cat: list[Path], new_root: Path, chunk_idx: int, file_idx: int, image_keys: list[str]
dataframes = [pd.read_parquet(file) for file in paths_to_cat] ):
# Concatenate all DataFrames along rows """Concatenate multiple parquet data files into a single file.
concatenated_df = pd.concat(dataframes, ignore_index=True)
Args:
paths_to_cat: List of parquet file paths to concatenate
new_root: Root directory for the new dataset
chunk_idx: Chunk index for the output file
file_idx: File index within the chunk
image_keys: List of feature keys that contain images
"""
datasets_list: list[Dataset] = [Dataset.from_parquet(str(file)) for file in paths_to_cat]
concatenated_ds: Dataset = concatenate_datasets(datasets_list)
if len(image_keys) > 0:
logging.debug(f"Embedding {len(image_keys)} image features for optimal training performance")
concatenated_ds = embed_images(concatenated_ds)
path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx) path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
path.parent.mkdir(parents=True, exist_ok=True) path.parent.mkdir(parents=True, exist_ok=True)
if len(image_keys) > 0: table = concatenated_ds.with_format("arrow")[:]
schema = pa.Schema.from_pandas(concatenated_df) writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True)
features = Features.from_arrow_schema(schema) writer.write_table(table)
for key in image_keys: writer.close()
features[key] = Image()
schema = features.arrow_schema
else:
schema = None
concatenated_df.to_parquet(path, index=False, schema=schema)
def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int): def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int):