mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-18 02:00:03 +00:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8008cb357d | |||
| ca5a4a7ae5 | |||
| b5dcd70d2c |
@@ -23,8 +23,6 @@ import platform
|
|||||||
import time
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from threading import Event, Lock, Thread
|
from threading import Event, Lock, Thread
|
||||||
from multiprocessing import Process, Event as EventProcess, JoinableQueue as Queue
|
|
||||||
from queue import Empty
|
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from numpy.typing import NDArray # type: ignore # TODO: add type stubs for numpy.typing
|
from numpy.typing import NDArray # type: ignore # TODO: add type stubs for numpy.typing
|
||||||
@@ -121,10 +119,11 @@ class OpenCVCamera(Camera):
|
|||||||
|
|
||||||
self.videocapture: cv2.VideoCapture | None = None
|
self.videocapture: cv2.VideoCapture | None = None
|
||||||
|
|
||||||
self.process: Process | None = None
|
self.thread: Thread | None = None
|
||||||
self.stop_event: EventProcess | None = None
|
self.stop_event: Event | None = None
|
||||||
self.frame_queue: Queue = Queue()
|
self.frame_lock: Lock = Lock()
|
||||||
self.latest_frame: NDArray[Any] | None = None
|
self.latest_frame: NDArray[Any] | None = None
|
||||||
|
self.new_frame_event: Event = Event()
|
||||||
|
|
||||||
self.rotation: int | None = get_cv2_rotation(config.rotation)
|
self.rotation: int | None = get_cv2_rotation(config.rotation)
|
||||||
self.backend: int = get_cv2_backend()
|
self.backend: int = get_cv2_backend()
|
||||||
@@ -443,36 +442,37 @@ class OpenCVCamera(Camera):
|
|||||||
while not self.stop_event.is_set():
|
while not self.stop_event.is_set():
|
||||||
try:
|
try:
|
||||||
color_image = self.read()
|
color_image = self.read()
|
||||||
self.frame_queue.put_nowait(color_image)
|
|
||||||
|
with self.frame_lock:
|
||||||
|
self.latest_frame = color_image
|
||||||
|
self.new_frame_event.set()
|
||||||
|
|
||||||
except DeviceNotConnectedError:
|
except DeviceNotConnectedError:
|
||||||
break
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Error reading frame in background thread for {self}: {e}")
|
logger.warning(f"Error reading frame in background thread for {self}: {e}")
|
||||||
|
|
||||||
def _start_read_process(self) -> None:
|
def _start_read_thread(self) -> None:
|
||||||
"""Starts or restarts the background read thread if it's not running."""
|
"""Starts or restarts the background read thread if it's not running."""
|
||||||
if self.process is not None and self.process.is_alive():
|
if self.thread is not None and self.thread.is_alive():
|
||||||
self.frame_queue.join()
|
self.thread.join(timeout=0.1)
|
||||||
self.process.join()
|
|
||||||
if self.stop_event is not None:
|
if self.stop_event is not None:
|
||||||
self.stop_event.set()
|
self.stop_event.set()
|
||||||
|
|
||||||
self.stop_event = Event()
|
self.stop_event = Event()
|
||||||
self.process = Process(target=self._read_loop, args=(), name=f"{self}_read_loop")
|
self.thread = Thread(target=self._read_loop, args=(), name=f"{self}_read_loop")
|
||||||
self.process.daemon = True
|
self.thread.daemon = True
|
||||||
self.process.start()
|
self.thread.start()
|
||||||
|
|
||||||
def _stop_read_thread(self) -> None:
|
def _stop_read_thread(self) -> None:
|
||||||
"""Signals the background read thread to stop and waits for it to join."""
|
"""Signals the background read thread to stop and waits for it to join."""
|
||||||
if self.stop_event is not None:
|
if self.stop_event is not None:
|
||||||
self.stop_event.set()
|
self.stop_event.set()
|
||||||
|
|
||||||
if self.process is not None and self.process.is_alive():
|
if self.thread is not None and self.thread.is_alive():
|
||||||
self.frame_queue.join()
|
self.thread.join(timeout=2.0)
|
||||||
self.process.join()
|
|
||||||
|
|
||||||
self.process = None
|
self.thread = None
|
||||||
self.stop_event = None
|
self.stop_event = None
|
||||||
|
|
||||||
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
|
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
|
||||||
@@ -499,32 +499,24 @@ class OpenCVCamera(Camera):
|
|||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
raise DeviceNotConnectedError(f"{self} is not connected.")
|
raise DeviceNotConnectedError(f"{self} is not connected.")
|
||||||
|
|
||||||
if self.process is None or not self.process.is_alive():
|
if self.thread is None or not self.thread.is_alive():
|
||||||
self._start_read_process()
|
self._start_read_thread()
|
||||||
|
|
||||||
if self.latest_frame is None:
|
if not self.new_frame_event.wait(timeout=timeout_ms / 1000.0):
|
||||||
self.latest_frame = self.frame_queue.get()
|
thread_alive = self.thread is not None and self.thread.is_alive()
|
||||||
self.frame_queue.task_done()
|
raise TimeoutError(
|
||||||
return self.latest_frame
|
f"Timed out waiting for frame from camera {self} after {timeout_ms} ms. "
|
||||||
|
f"Read thread alive: {thread_alive}."
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
with self.frame_lock:
|
||||||
frame = self.frame_queue.get(timeout=timeout_ms / 1000.0)
|
frame = self.latest_frame
|
||||||
self.frame_queue.task_done()
|
self.new_frame_event.clear()
|
||||||
except Empty:
|
|
||||||
process_alive = self.process is not None and self.process.is_alive()
|
|
||||||
if process_alive:
|
|
||||||
logger.warning(f"{self} async_read timed out after {timeout_ms} ms but camera is still running.")
|
|
||||||
return self.latest_frame
|
|
||||||
else:
|
|
||||||
raise TimeoutError(
|
|
||||||
f"{self} async_read timed out after {timeout_ms} ms: camera is not responding !"
|
|
||||||
)
|
|
||||||
|
|
||||||
if frame is None:
|
if frame is None:
|
||||||
raise RuntimeError(f"Internal error: Event set but no frame available for {self}.")
|
raise RuntimeError(f"Internal error: Event set but no frame available for {self}.")
|
||||||
else:
|
|
||||||
self.latest_frame = frame
|
return frame
|
||||||
return self.latest_frame
|
|
||||||
|
|
||||||
def disconnect(self) -> None:
|
def disconnect(self) -> None:
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -50,9 +50,9 @@ from typing import Any
|
|||||||
|
|
||||||
import jsonlines
|
import jsonlines
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import pyarrow as pa
|
import pyarrow.parquet as pq
|
||||||
import tqdm
|
import tqdm
|
||||||
from datasets import Dataset, Features, Image
|
from datasets import Dataset, concatenate_datasets
|
||||||
from huggingface_hub import HfApi, snapshot_download
|
from huggingface_hub import HfApi, snapshot_download
|
||||||
from requests import HTTPError
|
from requests import HTTPError
|
||||||
|
|
||||||
@@ -68,6 +68,7 @@ from lerobot.datasets.utils import (
|
|||||||
LEGACY_EPISODES_STATS_PATH,
|
LEGACY_EPISODES_STATS_PATH,
|
||||||
LEGACY_TASKS_PATH,
|
LEGACY_TASKS_PATH,
|
||||||
cast_stats_to_numpy,
|
cast_stats_to_numpy,
|
||||||
|
embed_images,
|
||||||
flatten_dict,
|
flatten_dict,
|
||||||
get_file_size_in_mb,
|
get_file_size_in_mb,
|
||||||
get_parquet_file_size_in_mb,
|
get_parquet_file_size_in_mb,
|
||||||
@@ -174,25 +175,33 @@ def convert_tasks(root, new_root):
|
|||||||
write_tasks(df_tasks, new_root)
|
write_tasks(df_tasks, new_root)
|
||||||
|
|
||||||
|
|
||||||
def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys):
|
def concat_data_files(
|
||||||
# TODO(rcadene): to save RAM use Dataset.from_parquet(file) and concatenate_datasets
|
paths_to_cat: list[Path], new_root: Path, chunk_idx: int, file_idx: int, image_keys: list[str]
|
||||||
dataframes = [pd.read_parquet(file) for file in paths_to_cat]
|
):
|
||||||
# Concatenate all DataFrames along rows
|
"""Concatenate multiple parquet data files into a single file.
|
||||||
concatenated_df = pd.concat(dataframes, ignore_index=True)
|
|
||||||
|
Args:
|
||||||
|
paths_to_cat: List of parquet file paths to concatenate
|
||||||
|
new_root: Root directory for the new dataset
|
||||||
|
chunk_idx: Chunk index for the output file
|
||||||
|
file_idx: File index within the chunk
|
||||||
|
image_keys: List of feature keys that contain images
|
||||||
|
"""
|
||||||
|
|
||||||
|
datasets_list: list[Dataset] = [Dataset.from_parquet(str(file)) for file in paths_to_cat]
|
||||||
|
concatenated_ds: Dataset = concatenate_datasets(datasets_list)
|
||||||
|
|
||||||
|
if len(image_keys) > 0:
|
||||||
|
logging.debug(f"Embedding {len(image_keys)} image features for optimal training performance")
|
||||||
|
concatenated_ds = embed_images(concatenated_ds)
|
||||||
|
|
||||||
path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
|
path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
|
||||||
path.parent.mkdir(parents=True, exist_ok=True)
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
if len(image_keys) > 0:
|
table = concatenated_ds.with_format("arrow")[:]
|
||||||
schema = pa.Schema.from_pandas(concatenated_df)
|
writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True)
|
||||||
features = Features.from_arrow_schema(schema)
|
writer.write_table(table)
|
||||||
for key in image_keys:
|
writer.close()
|
||||||
features[key] = Image()
|
|
||||||
schema = features.arrow_schema
|
|
||||||
else:
|
|
||||||
schema = None
|
|
||||||
|
|
||||||
concatenated_df.to_parquet(path, index=False, schema=schema)
|
|
||||||
|
|
||||||
|
|
||||||
def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int):
|
def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int):
|
||||||
|
|||||||
Reference in New Issue
Block a user