Merge branch 'main' into feature/add-multitask-dit

This commit is contained in:
Bryson Jones
2026-01-29 10:23:39 -08:00
committed by GitHub
21 changed files with 1179 additions and 641 deletions
+2 -1
View File
@@ -81,6 +81,7 @@ def replay(cfg: ReplayConfig):
actions = dataset.hf_dataset.select_columns(ACTION)
robot.connect()
try:
log_say("Replaying episode", cfg.play_sounds, blocking=True)
for idx in range(dataset.num_frames):
start_episode_t = time.perf_counter()
@@ -97,7 +98,7 @@ def replay(cfg: ReplayConfig):
dt_s = time.perf_counter() - start_episode_t
precise_sleep(max(1 / dataset.fps - dt_s, 0.0))
finally:
robot.disconnect()
+2
View File
@@ -78,6 +78,7 @@ def main():
listener, events = init_keyboard_listener()
init_rerun(session_name="lekiwi_evaluate")
try:
if not robot.is_connected:
raise ValueError("Robot is not connected!")
@@ -131,6 +132,7 @@ def main():
dataset.save_episode()
recorded_episodes += 1
finally:
# Clean up
log_say("Stop recording")
robot.disconnect()
+2 -1
View File
@@ -74,6 +74,7 @@ def main():
listener, events = init_keyboard_listener()
init_rerun(session_name="lekiwi_record")
try:
if not robot.is_connected or not leader_arm.is_connected or not keyboard.is_connected:
raise ValueError("Robot or teleop is not connected!")
@@ -125,7 +126,7 @@ def main():
# Save episode
dataset.save_episode()
recorded_episodes += 1
finally:
# Clean up
log_say("Stop recording")
robot.disconnect()
+4 -2
View File
@@ -42,6 +42,7 @@ def main():
# Connect to the robot
robot.connect()
try:
if not robot.is_connected:
raise ValueError("Robot is not connected!")
@@ -52,14 +53,15 @@ def main():
# Get recorded action from dataset
action = {
name: float(actions[idx][ACTION][i]) for i, name in enumerate(dataset.features[ACTION]["names"])
name: float(actions[idx][ACTION][i])
for i, name in enumerate(dataset.features[ACTION]["names"])
}
# Send action to robot
_ = robot.send_action(action)
precise_sleep(max(1.0 / dataset.fps - (time.perf_counter() - t0), 0.0))
finally:
robot.disconnect()
+5 -2
View File
@@ -142,6 +142,7 @@ def main():
listener, events = init_keyboard_listener()
init_rerun(session_name="phone_so100_evaluate")
try:
if not robot.is_connected:
raise ValueError("Robot is not connected!")
@@ -168,7 +169,9 @@ def main():
)
# Reset the environment if not stopping or re-recording
if not events["stop_recording"] and ((episode_idx < NUM_EPISODES - 1) or events["rerecord_episode"]):
if not events["stop_recording"] and (
(episode_idx < NUM_EPISODES - 1) or events["rerecord_episode"]
):
log_say("Reset the environment")
record_loop(
robot=robot,
@@ -192,7 +195,7 @@ def main():
# Save episode
dataset.save_episode()
episode_idx += 1
finally:
# Clean up
log_say("Stop recording")
robot.disconnect()
+5 -2
View File
@@ -149,6 +149,7 @@ def main():
listener, events = init_keyboard_listener()
init_rerun(session_name="phone_so100_record")
try:
if not robot.is_connected or not phone.is_connected:
raise ValueError("Robot or teleop is not connected!")
@@ -173,7 +174,9 @@ def main():
)
# Reset the environment if not stopping or re-recording
if not events["stop_recording"] and (episode_idx < NUM_EPISODES - 1 or events["rerecord_episode"]):
if not events["stop_recording"] and (
episode_idx < NUM_EPISODES - 1 or events["rerecord_episode"]
):
log_say("Reset the environment")
record_loop(
robot=robot,
@@ -198,7 +201,7 @@ def main():
# Save episode
dataset.save_episode()
episode_idx += 1
finally:
# Clean up
log_say("Stop recording")
robot.disconnect()
+4 -2
View File
@@ -73,6 +73,7 @@ def main():
# Connect to the robot
robot.connect()
try:
if not robot.is_connected:
raise ValueError("Robot is not connected!")
@@ -83,7 +84,8 @@ def main():
# Get recorded action from dataset
ee_action = {
name: float(actions[idx][ACTION][i]) for i, name in enumerate(dataset.features[ACTION]["names"])
name: float(actions[idx][ACTION][i])
for i, name in enumerate(dataset.features[ACTION]["names"])
}
# Get robot observation
@@ -96,7 +98,7 @@ def main():
_ = robot.send_action(joint_action)
precise_sleep(max(1.0 / dataset.fps - (time.perf_counter() - t0), 0.0))
finally:
# Clean up
robot.disconnect()
+5 -2
View File
@@ -142,6 +142,7 @@ def main():
listener, events = init_keyboard_listener()
init_rerun(session_name="so100_so100_evaluate")
try:
if not robot.is_connected:
raise ValueError("Robot is not connected!")
@@ -168,7 +169,9 @@ def main():
)
# Reset the environment if not stopping or re-recording
if not events["stop_recording"] and ((episode_idx < NUM_EPISODES - 1) or events["rerecord_episode"]):
if not events["stop_recording"] and (
(episode_idx < NUM_EPISODES - 1) or events["rerecord_episode"]
):
log_say("Reset the environment")
record_loop(
robot=robot,
@@ -192,7 +195,7 @@ def main():
# Save episode
dataset.save_episode()
episode_idx += 1
finally:
# Clean up
log_say("Stop recording")
robot.disconnect()
+5 -1
View File
@@ -146,6 +146,7 @@ def main():
listener, events = init_keyboard_listener()
init_rerun(session_name="recording_phone")
try:
if not leader.is_connected or not follower.is_connected:
raise ValueError("Robot or teleop is not connected!")
@@ -170,7 +171,9 @@ def main():
)
# Reset the environment if not stopping or re-recording
if not events["stop_recording"] and (episode_idx < NUM_EPISODES - 1 or events["rerecord_episode"]):
if not events["stop_recording"] and (
episode_idx < NUM_EPISODES - 1 or events["rerecord_episode"]
):
log_say("Reset the environment")
record_loop(
robot=follower,
@@ -196,6 +199,7 @@ def main():
dataset.save_episode()
episode_idx += 1
finally:
# Clean up
log_say("Stop recording")
leader.disconnect()
+4 -1
View File
@@ -74,6 +74,7 @@ def main():
# Connect to the robot
robot.connect()
try:
if not robot.is_connected:
raise ValueError("Robot is not connected!")
@@ -84,7 +85,8 @@ def main():
# Get recorded action from dataset
ee_action = {
name: float(actions[idx][ACTION][i]) for i, name in enumerate(dataset.features[ACTION]["names"])
name: float(actions[idx][ACTION][i])
for i, name in enumerate(dataset.features[ACTION]["names"])
}
# Get robot observation
@@ -98,6 +100,7 @@ def main():
precise_sleep(max(1.0 / dataset.fps - (time.perf_counter() - t0), 0.0))
finally:
# Clean up
robot.disconnect()
+82 -18
View File
@@ -15,11 +15,12 @@
# limitations under the License.
import abc
import warnings
from typing import Any
from numpy.typing import NDArray # type: ignore # TODO: add type stubs for numpy.typing
from .configs import CameraConfig, ColorMode
from .configs import CameraConfig
class Camera(abc.ABC):
@@ -30,20 +31,12 @@ class Camera(abc.ABC):
Manages basic camera properties (FPS, resolution) and core operations:
- Connection/disconnection
- Frame capture (sync/async)
- Frame capture (sync/async/latest)
Attributes:
fps (int | None): Configured frames per second
width (int | None): Frame width in pixels
height (int | None): Frame height in pixels
Example:
class MyCamera(Camera):
def __init__(self, config): ...
@property
def is_connected(self) -> bool: ...
def connect(self, warmup=True): ...
# Plus other required methods
"""
def __init__(self, config: CameraConfig):
@@ -56,6 +49,32 @@ class Camera(abc.ABC):
self.width: int | None = config.width
self.height: int | None = config.height
def __enter__(self):
"""
Context manager entry.
Automatically connects to the camera.
"""
self.connect()
return self
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""
Context manager exit.
Automatically disconnects, ensuring resources are released even on error.
"""
self.disconnect()
def __del__(self) -> None:
"""
Destructor safety net.
Attempts to disconnect if the object is garbage collected without cleanup.
"""
try:
if self.is_connected:
self.disconnect()
except Exception: # nosec B110
pass
@property
@abc.abstractmethod
def is_connected(self) -> bool:
@@ -89,12 +108,10 @@ class Camera(abc.ABC):
pass
@abc.abstractmethod
def read(self, color_mode: ColorMode | None = None) -> NDArray[Any]:
"""Capture and return a single frame from the camera.
def read(self) -> NDArray[Any]:
"""Capture and return a single frame from the camera synchronously.
Args:
color_mode: Desired color mode for the output frame. If None,
uses the camera's default color mode.
This is a blocking call that will wait for the hardware and its SDK.
Returns:
np.ndarray: Captured frame as a numpy array.
@@ -103,17 +120,64 @@ class Camera(abc.ABC):
@abc.abstractmethod
def async_read(self, timeout_ms: float = ...) -> NDArray[Any]:
"""Asynchronously capture and return a single frame from the camera.
"""Return the most recent new frame.
This method retrieves the latest frame captured by the background thread.
If a new frame is already available in the buffer (captured since the last call),
it returns it immediately.
It blocks up to `timeout_ms` only if the buffer is empty or if the latest frame
was already consumed by a previous `async_read` call.
Essentially, this method return the latest unconsumed frame, waiting if necessary
for a new one to arrive within the specified timeout.
Usage:
- Ideal for control loops where you want to ensure every processed frame
is fresh, effectively synchronizing your loop to the camera's FPS.
- Causes of a timeout usually include: very low camera FPS, heavy processing load,
or if the camera is disconnected.
Args:
timeout_ms: Maximum time to wait for a frame in milliseconds.
Defaults to implementation-specific timeout.
timeout_ms: Maximum time to wait for a new frame in milliseconds.
Defaults to 200ms (0.2s).
Returns:
np.ndarray: Captured frame as a numpy array.
Raises:
TimeoutError: If no new frame arrives within `timeout_ms`.
"""
pass
def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]:
"""Return the most recent frame captured immediately (Peeking).
This method is non-blocking and returns whatever is currently in the
memory buffer. The frame may be stale,
meaning it could have been captured a while ago (hanging camera scenario e.g.).
Usage:
Ideal for scenarios requiring zero latency or decoupled frequencies & when
we want a guaranteed frame, such as UI visualization, logging, or
non-critical monitoring.
Returns:
NDArray[Any]: The frame image (numpy array).
Raises:
TimeoutError: If the latest frame is older than `max_age_ms`.
NotConnectedError: If the camera is not connected.
RuntimeError: If the camera is connected but has not captured any frames yet.
"""
warnings.warn(
f"{self.__class__.__name__}.read_latest() is not implemented. "
"Please override read_latest(); it will be required in future releases.",
FutureWarning,
stacklevel=2,
)
return self.async_read()
@abc.abstractmethod
def disconnect(self) -> None:
"""Disconnect from the camera and release resources."""
+110 -54
View File
@@ -70,34 +70,24 @@ class OpenCVCamera(Camera):
Example:
```python
from lerobot.cameras.opencv import OpenCVCamera
from lerobot.cameras.configuration_opencv import OpenCVCameraConfig, ColorMode, Cv2Rotation
from lerobot.cameras.configuration_opencv import OpenCVCameraConfig
# Basic usage with camera index 0
config = OpenCVCameraConfig(index_or_path=0)
camera = OpenCVCamera(config)
camera.connect()
# Read 1 frame synchronously
# Read 1 frame synchronously (blocking)
color_image = camera.read()
print(color_image.shape)
# Read 1 frame asynchronously
# Read 1 frame asynchronously (waits for new frame with a timeout)
async_image = camera.async_read()
# Get the latest frame immediately (no wait, returns timestamp)
latest_image, timestamp = camera.read_latest()
# When done, properly disconnect the camera using
camera.disconnect()
# Example with custom settings
custom_config = OpenCVCameraConfig(
index_or_path='/dev/video0', # Or use an index
fps=30,
width=1280,
height=720,
color_mode=ColorMode.RGB,
rotation=Cv2Rotation.ROTATE_90
)
custom_camera = OpenCVCamera(custom_config)
# ... connect, read, disconnect ...
```
"""
@@ -123,6 +113,7 @@ class OpenCVCamera(Camera):
self.stop_event: Event | None = None
self.frame_lock: Lock = Lock()
self.latest_frame: NDArray[Any] | None = None
self.latest_timestamp: float | None = None
self.new_frame_event: Event = Event()
self.rotation: int | None = get_cv2_rotation(config.rotation)
@@ -146,12 +137,16 @@ class OpenCVCamera(Camera):
Connects to the OpenCV camera specified in the configuration.
Initializes the OpenCV VideoCapture object, sets desired camera properties
(FPS, width, height), and performs initial checks.
(FPS, width, height), starts the background reading thread and performs initial checks.
Args:
warmup (bool): If True, waits at connect() time until at least one valid frame
has been captured by the background thread. Defaults to True.
Raises:
DeviceAlreadyConnectedError: If the camera is already connected.
ConnectionError: If the specified camera index/path is not found or the camera is found but fails to open.
RuntimeError: If the camera opens but fails to apply requested FPS/resolution settings.
ConnectionError: If the specified camera index/path is not found or fails to open.
RuntimeError: If the camera opens but fails to apply requested settings.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} is already connected.")
@@ -170,12 +165,16 @@ class OpenCVCamera(Camera):
)
self._configure_capture_settings()
self._start_read_thread()
if warmup:
if warmup and self.warmup_s > 0:
start_time = time.time()
while time.time() - start_time < self.warmup_s:
self.read()
self.async_read(timeout_ms=self.warmup_s * 1000)
time.sleep(0.1)
with self.frame_lock:
if self.latest_frame is None:
raise ConnectionError(f"{self} failed to capture frames during warmup.")
logger.info(f"{self} connected.")
@@ -196,8 +195,7 @@ class OpenCVCamera(Camera):
Raises:
RuntimeError: If the camera fails to set any of the specified properties
to the requested value.
DeviceNotConnectedError: If the camera is not connected when attempting
to configure settings.
DeviceNotConnectedError: If the camera is not connected.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"Cannot configure settings for {self} as it is not connected.")
@@ -339,6 +337,17 @@ class OpenCVCamera(Camera):
return found_cameras_info
def _read_from_hardware(self) -> NDArray[Any]:
if self.videocapture is None:
raise DeviceNotConnectedError(f"{self} videocapture is not initialized")
ret, frame = self.videocapture.read()
if not ret:
raise RuntimeError(f"{self} read failed (status={ret}).")
return frame
def read(self, color_mode: ColorMode | None = None) -> NDArray[Any]:
"""
Reads a single frame synchronously from the camera.
@@ -346,11 +355,6 @@ class OpenCVCamera(Camera):
This is a blocking call. It waits for the next available frame from the
camera hardware via OpenCV.
Args:
color_mode (Optional[ColorMode]): If specified, overrides the default
color mode (`self.color_mode`) for this read operation (e.g.,
request RGB even if default is BGR).
Returns:
np.ndarray: The captured frame as a NumPy array in the format
(height, width, channels), using the specified or default
@@ -362,34 +366,34 @@ class OpenCVCamera(Camera):
received frame dimensions don't match expectations before rotation.
ValueError: If an invalid `color_mode` is requested.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
start_time = time.perf_counter()
if self.videocapture is None:
raise DeviceNotConnectedError(f"{self} videocapture is not initialized")
if color_mode is not None:
logger.warning(
f"{self} read() color_mode parameter is deprecated and will be removed in future versions."
)
ret, frame = self.videocapture.read()
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if not ret or frame is None:
raise RuntimeError(f"{self} read failed (status={ret}).")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
processed_frame = self._postprocess_image(frame, color_mode)
self.new_frame_event.clear()
frame = self.async_read(timeout_ms=10000)
read_duration_ms = (time.perf_counter() - start_time) * 1e3
logger.debug(f"{self} read took: {read_duration_ms:.1f}ms")
return processed_frame
return frame
def _postprocess_image(self, image: NDArray[Any], color_mode: ColorMode | None = None) -> NDArray[Any]:
def _postprocess_image(self, image: NDArray[Any]) -> NDArray[Any]:
"""
Applies color conversion, dimension validation, and rotation to a raw frame.
Args:
image (np.ndarray): The raw image frame (expected BGR format from OpenCV).
color_mode (Optional[ColorMode]): The target color mode (RGB or BGR). If None,
uses the instance's default `self.color_mode`.
Returns:
np.ndarray: The processed image frame.
@@ -399,11 +403,10 @@ class OpenCVCamera(Camera):
RuntimeError: If the raw frame dimensions do not match the configured
`width` and `height`.
"""
requested_color_mode = self.color_mode if color_mode is None else color_mode
if requested_color_mode not in (ColorMode.RGB, ColorMode.BGR):
if self.color_mode not in (ColorMode.RGB, ColorMode.BGR):
raise ValueError(
f"Invalid color mode '{requested_color_mode}'. Expected {ColorMode.RGB} or {ColorMode.BGR}."
f"Invalid color mode '{self.color_mode}'. Expected {ColorMode.RGB} or {ColorMode.BGR}."
)
h, w, c = image.shape
@@ -417,7 +420,7 @@ class OpenCVCamera(Camera):
raise RuntimeError(f"{self} frame channels={c} do not match expected 3 channels (RGB/BGR).")
processed_image = image
if requested_color_mode == ColorMode.RGB:
if self.color_mode == ColorMode.RGB:
processed_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
if self.rotation in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE, cv2.ROTATE_180]:
@@ -431,7 +434,7 @@ class OpenCVCamera(Camera):
On each iteration:
1. Reads a color frame
2. Stores result in latest_frame (thread-safe)
2. Stores result in latest_frame and updates timestamp (thread-safe)
3. Sets new_frame_event to notify listeners
Stops on DeviceNotConnectedError, logs other errors and continues.
@@ -439,30 +442,37 @@ class OpenCVCamera(Camera):
if self.stop_event is None:
raise RuntimeError(f"{self}: stop_event is not initialized before starting read loop.")
failure_count = 0
while not self.stop_event.is_set():
try:
color_image = self.read()
raw_frame = self._read_from_hardware()
processed_frame = self._postprocess_image(raw_frame)
capture_time = time.perf_counter()
with self.frame_lock:
self.latest_frame = color_image
self.latest_frame = processed_frame
self.latest_timestamp = capture_time
self.new_frame_event.set()
failure_count = 0
except DeviceNotConnectedError:
break
except Exception as e:
if failure_count <= 10:
failure_count += 1
logger.warning(f"Error reading frame in background thread for {self}: {e}")
else:
raise RuntimeError(f"{self} exceeded maximum consecutive read failures.") from e
def _start_read_thread(self) -> None:
"""Starts or restarts the background read thread if it's not running."""
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=0.1)
if self.stop_event is not None:
self.stop_event.set()
self._stop_read_thread()
self.stop_event = Event()
self.thread = Thread(target=self._read_loop, args=(), name=f"{self}_read_loop")
self.thread.daemon = True
self.thread.start()
time.sleep(0.1)
def _stop_read_thread(self) -> None:
"""Signals the background read thread to stop and waits for it to join."""
@@ -475,6 +485,11 @@ class OpenCVCamera(Camera):
self.thread = None
self.stop_event = None
with self.frame_lock:
self.latest_frame = None
self.latest_timestamp = None
self.new_frame_event.clear()
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
"""
Reads the latest available frame asynchronously.
@@ -482,6 +497,7 @@ class OpenCVCamera(Camera):
This method retrieves the most recent frame captured by the background
read thread. It does not block waiting for the camera hardware directly,
but may wait up to timeout_ms for the background thread to provide a frame.
It is best effort under high FPS.
Args:
timeout_ms (float): Maximum time in milliseconds to wait for a frame
@@ -500,13 +516,12 @@ class OpenCVCamera(Camera):
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
self._start_read_thread()
raise RuntimeError(f"{self} read thread is not running.")
if not self.new_frame_event.wait(timeout=timeout_ms / 1000.0):
thread_alive = self.thread is not None and self.thread.is_alive()
raise TimeoutError(
f"Timed out waiting for frame from camera {self} after {timeout_ms} ms. "
f"Read thread alive: {thread_alive}."
f"Read thread alive: {self.thread.is_alive()}."
)
with self.frame_lock:
@@ -518,6 +533,42 @@ class OpenCVCamera(Camera):
return frame
def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]:
"""Return the most recent frame captured immediately (Peeking).
This method is non-blocking and returns whatever is currently in the
memory buffer. The frame may be stale,
meaning it could have been captured a while ago (hanging camera scenario e.g.).
Returns:
NDArray[Any]: The frame image (numpy array).
Raises:
TimeoutError: If the latest frame is older than `max_age_ms`.
DeviceNotConnectedError: If the camera is not connected.
RuntimeError: If the camera is connected but has not captured any frames yet.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
with self.frame_lock:
frame = self.latest_frame
timestamp = self.latest_timestamp
if frame is None or timestamp is None:
raise RuntimeError(f"{self} has not captured any frames yet.")
age_ms = (time.perf_counter() - timestamp) * 1e3
if age_ms > max_age_ms:
raise TimeoutError(
f"{self} latest frame is too old: {age_ms:.1f} ms (max allowed: {max_age_ms} ms)."
)
return frame
def disconnect(self) -> None:
"""
Disconnects from the camera and cleans up resources.
@@ -538,4 +589,9 @@ class OpenCVCamera(Camera):
self.videocapture.release()
self.videocapture = None
with self.frame_lock:
self.latest_frame = None
self.latest_timestamp = None
self.new_frame_event.clear()
logger.info(f"{self} disconnected.")
@@ -80,6 +80,8 @@ class Reachy2Camera(Camera):
self.config = config
self.color_mode = config.color_mode
self.latest_frame: NDArray[Any] | None = None
self.latest_timestamp: float | None = None
self.cam_manager: CameraManager | None = None
@@ -125,12 +127,7 @@ class Reachy2Camera(Camera):
"""
Reads a single frame synchronously from the camera.
This is a blocking call.
Args:
color_mode (Optional[ColorMode]): If specified, overrides the default
color mode (`self.color_mode`) for this read operation (e.g.,
request RGB even if default is BGR).
This method retrieves the most recent frame available in Reachy 2's low-level software.
Returns:
np.ndarray: The captured frame as a NumPy array in the format
@@ -145,6 +142,11 @@ class Reachy2Camera(Camera):
if self.cam_manager is None:
raise DeviceNotConnectedError(f"{self} is not connected.")
if color_mode is not None:
logger.warning(
f"{self} read() color_mode parameter is deprecated and will be removed in future versions."
)
frame: NDArray[Any] = np.empty((0, 0, 3), dtype=np.uint8)
if self.config.name == "teleop" and hasattr(self.cam_manager, "teleop"):
@@ -165,11 +167,18 @@ class Reachy2Camera(Camera):
raise ValueError(f"Invalid camera name '{self.config.name}'. Expected 'teleop' or 'depth'.")
if frame is None:
return np.empty((0, 0, 3), dtype=np.uint8)
raise RuntimeError(f"Internal error: No frame available for {self}.")
if self.config.color_mode == "rgb":
if self.color_mode not in (ColorMode.RGB, ColorMode.BGR):
raise ValueError(
f"Invalid color mode '{self.color_mode}'. Expected {ColorMode.RGB} or {ColorMode.BGR}."
)
if self.color_mode == ColorMode.RGB:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self.latest_frame = frame
self.latest_timestamp = time.perf_counter()
read_duration_ms = (time.perf_counter() - start_time) * 1e3
logger.debug(f"{self} read took: {read_duration_ms:.1f}ms")
@@ -177,13 +186,7 @@ class Reachy2Camera(Camera):
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
"""
Reads the latest available frame.
This method retrieves the most recent frame available in Reachy 2's low-level software.
Args:
timeout_ms (float): Maximum time in milliseconds to wait for a frame
to become available. Defaults to 200ms (0.2 seconds).
Same as read()
Returns:
np.ndarray: The latest captured frame as a NumPy array in the format
@@ -197,12 +200,38 @@ class Reachy2Camera(Camera):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
frame = self.read()
return self.read()
if frame is None:
raise RuntimeError(f"Internal error: No frame available for {self}.")
def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]:
"""Return the most recent frame captured immediately (Peeking).
return frame
This method is non-blocking and returns whatever is currently in the
memory buffer. The frame may be stale,
meaning it could have been captured a while ago (hanging camera scenario e.g.).
Returns:
tuple[NDArray, float]:
- The frame image (numpy array).
- The timestamp (time.perf_counter) when this frame was captured.
Raises:
TimeoutError: If the latest frame is older than `max_age_ms`.
DeviceNotConnectedError: If the camera is not connected.
RuntimeError: If the camera is connected but has not captured any frames yet.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.latest_frame is None or self.latest_timestamp is None:
raise RuntimeError(f"{self} has not captured any frames yet.")
age_ms = (time.perf_counter() - self.latest_timestamp) * 1e3
if age_ms > max_age_ms:
raise TimeoutError(
f"{self} latest frame is too old: {age_ms:.1f} ms (max allowed: {max_age_ms} ms)."
)
return self.latest_frame
def disconnect(self) -> None:
"""
+141 -63
View File
@@ -72,15 +72,14 @@ class RealSenseCamera(Camera):
camera = RealSenseCamera(config)
camera.connect()
# Read 1 frame synchronously
# Read 1 frame synchronously (blocking)
color_image = camera.read()
print(color_image.shape)
# Read 1 frame asynchronously
# Read 1 frame asynchronously (waits for new frame with a timeout)
async_image = camera.async_read()
# When done, properly disconnect the camera using
camera.disconnect()
# Get the latest frame immediately (no wait, returns timestamp)
latest_image, timestamp = camera.read_latest()
# Example with depth capture and custom settings
custom_config = RealSenseCameraConfig(
@@ -133,7 +132,9 @@ class RealSenseCamera(Camera):
self.thread: Thread | None = None
self.stop_event: Event | None = None
self.frame_lock: Lock = Lock()
self.latest_frame: NDArray[Any] | None = None
self.latest_color_frame: NDArray[Any] | None = None
self.latest_depth_frame: NDArray[Any] | None = None
self.latest_timestamp: float | None = None
self.new_frame_event: Event = Event()
self.rotation: int | None = get_cv2_rotation(config.rotation)
@@ -158,6 +159,10 @@ class RealSenseCamera(Camera):
Initializes the RealSense pipeline, configures the required streams (color
and optionally depth), starts the pipeline, and validates the actual stream settings.
Args:
warmup (bool): If True, waits at connect() time until at least one valid frame
has been captured by the background thread. Defaults to True.
Raises:
DeviceAlreadyConnectedError: If the camera is already connected.
ValueError: If the configuration is invalid (e.g., missing serial/name, name not unique).
@@ -181,15 +186,18 @@ class RealSenseCamera(Camera):
) from e
self._configure_capture_settings()
self._start_read_thread()
# NOTE(Steven/Caroline): Enforcing at least one second of warmup as RS cameras need a bit of time before the first read. If we don't wait, the first read from the warmup will raise.
self.warmup_s = max(self.warmup_s, 1)
if warmup:
time.sleep(
1
) # NOTE(Steven): RS cameras need a bit of time to warm up before the first read. If we don't wait, the first read from the warmup will raise.
start_time = time.time()
while time.time() - start_time < self.warmup_s:
self.read()
self.async_read(timeout_ms=self.warmup_s * 1000)
time.sleep(0.1)
with self.frame_lock:
if self.latest_color_frame is None or self.use_depth and self.latest_depth_frame is None:
raise ConnectionError(f"{self} failed to capture frames during warmup.")
logger.info(f"{self} connected.")
@@ -319,9 +327,6 @@ class RealSenseCamera(Camera):
This is a blocking call. It waits for a coherent set of frames (depth)
from the camera hardware via the RealSense pipeline.
Args:
timeout_ms (int): Maximum time in milliseconds to wait for a frame. Defaults to 200ms.
Returns:
np.ndarray: The depth map as a NumPy array (height, width)
of type `np.uint16` (raw depth values in millimeters) and rotation.
@@ -330,44 +335,52 @@ class RealSenseCamera(Camera):
DeviceNotConnectedError: If the camera is not connected.
RuntimeError: If reading frames from the pipeline fails or frames are invalid.
"""
if timeout_ms:
logger.warning(
f"{self} read() timeout_ms parameter is deprecated and will be removed in future versions."
)
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if not self.use_depth:
raise RuntimeError(
f"Failed to capture depth frame '.read_depth()'. Depth stream is not enabled for {self}."
)
start_time = time.perf_counter()
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
self.new_frame_event.clear()
_ = self.async_read(timeout_ms=10000)
with self.frame_lock:
depth_map = self.latest_depth_frame
if depth_map is None:
raise RuntimeError("No depth frame available. Ensure camera is streaming.")
return depth_map
def _read_from_hardware(self):
if self.rs_pipeline is None:
raise RuntimeError(f"{self}: rs_pipeline must be initialized before use.")
ret, frame = self.rs_pipeline.try_wait_for_frames(timeout_ms=timeout_ms)
ret, frame = self.rs_pipeline.try_wait_for_frames(timeout_ms=10000)
if not ret or frame is None:
raise RuntimeError(f"{self} read_depth failed (status={ret}).")
raise RuntimeError(f"{self} read failed (status={ret}).")
depth_frame = frame.get_depth_frame()
depth_map = np.asanyarray(depth_frame.get_data())
return frame
depth_map_processed = self._postprocess_image(depth_map, depth_frame=True)
read_duration_ms = (time.perf_counter() - start_time) * 1e3
logger.debug(f"{self} read took: {read_duration_ms:.1f}ms")
return depth_map_processed
def read(self, color_mode: ColorMode | None = None, timeout_ms: int = 200) -> NDArray[Any]:
def read(self, color_mode: ColorMode | None = None, timeout_ms: int = 0) -> NDArray[Any]:
"""
Reads a single frame (color) synchronously from the camera.
This is a blocking call. It waits for a coherent set of frames (color)
from the camera hardware via the RealSense pipeline.
Args:
timeout_ms (int): Maximum time in milliseconds to wait for a frame. Defaults to 200ms.
Returns:
np.ndarray: The captured color frame as a NumPy array
(height, width, channels), processed according to `color_mode` and rotation.
@@ -378,39 +391,39 @@ class RealSenseCamera(Camera):
ValueError: If an invalid `color_mode` is requested.
"""
start_time = time.perf_counter()
if color_mode is not None:
logger.warning(
f"{self} read() color_mode parameter is deprecated and will be removed in future versions."
)
if timeout_ms:
logger.warning(
f"{self} read() timeout_ms parameter is deprecated and will be removed in future versions."
)
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
start_time = time.perf_counter()
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
if self.rs_pipeline is None:
raise RuntimeError(f"{self}: rs_pipeline must be initialized before use.")
self.new_frame_event.clear()
ret, frame = self.rs_pipeline.try_wait_for_frames(timeout_ms=timeout_ms)
if not ret or frame is None:
raise RuntimeError(f"{self} read failed (status={ret}).")
color_frame = frame.get_color_frame()
color_image_raw = np.asanyarray(color_frame.get_data())
color_image_processed = self._postprocess_image(color_image_raw, color_mode)
frame = self.async_read(timeout_ms=10000)
read_duration_ms = (time.perf_counter() - start_time) * 1e3
logger.debug(f"{self} read took: {read_duration_ms:.1f}ms")
return color_image_processed
return frame
def _postprocess_image(
self, image: NDArray[Any], color_mode: ColorMode | None = None, depth_frame: bool = False
) -> NDArray[Any]:
def _postprocess_image(self, image: NDArray[Any], depth_frame: bool = False) -> NDArray[Any]:
"""
Applies color conversion, dimension validation, and rotation to a raw color frame.
Args:
image (np.ndarray): The raw image frame (expected RGB format from RealSense).
color_mode (Optional[ColorMode]): The target color mode (RGB or BGR). If None,
uses the instance's default `self.color_mode`.
Returns:
np.ndarray: The processed image frame according to `self.color_mode` and `self.rotation`.
@@ -421,9 +434,9 @@ class RealSenseCamera(Camera):
`width` and `height`.
"""
if color_mode and color_mode not in (ColorMode.RGB, ColorMode.BGR):
if self.color_mode and self.color_mode not in (ColorMode.RGB, ColorMode.BGR):
raise ValueError(
f"Invalid requested color mode '{color_mode}'. Expected {ColorMode.RGB} or {ColorMode.BGR}."
f"Invalid requested color mode '{self.color_mode}'. Expected {ColorMode.RGB} or {ColorMode.BGR}."
)
if depth_frame:
@@ -454,7 +467,7 @@ class RealSenseCamera(Camera):
On each iteration:
1. Reads a color frame with 500ms timeout
2. Stores result in latest_frame (thread-safe)
2. Stores result in latest_frame and updates timestamp (thread-safe)
3. Sets new_frame_event to notify listeners
Stops on DeviceNotConnectedError, logs other errors and continues.
@@ -462,25 +475,41 @@ class RealSenseCamera(Camera):
if self.stop_event is None:
raise RuntimeError(f"{self}: stop_event is not initialized before starting read loop.")
failure_count = 0
while not self.stop_event.is_set():
try:
color_image = self.read(timeout_ms=500)
frame = self._read_from_hardware()
color_frame_raw = frame.get_color_frame()
color_frame = np.asanyarray(color_frame_raw.get_data())
processed_color_frame = self._postprocess_image(color_frame)
if self.use_depth:
depth_frame_raw = frame.get_depth_frame()
depth_frame = np.asanyarray(depth_frame_raw.get_data())
processed_depth_frame = self._postprocess_image(depth_frame, depth_frame=True)
capture_time = time.perf_counter()
with self.frame_lock:
self.latest_frame = color_image
self.latest_color_frame = processed_color_frame
if self.use_depth:
self.latest_depth_frame = processed_depth_frame
self.latest_timestamp = capture_time
self.new_frame_event.set()
failure_count = 0
except DeviceNotConnectedError:
break
except Exception as e:
if failure_count <= 10:
failure_count += 1
logger.warning(f"Error reading frame in background thread for {self}: {e}")
else:
raise RuntimeError(f"{self} exceeded maximum consecutive read failures.") from e
def _start_read_thread(self) -> None:
"""Starts or restarts the background read thread if it's not running."""
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=0.1)
if self.stop_event is not None:
self.stop_event.set()
self._stop_read_thread()
self.stop_event = Event()
self.thread = Thread(target=self._read_loop, args=(), name=f"{self}_read_loop")
@@ -498,6 +527,12 @@ class RealSenseCamera(Camera):
self.thread = None
self.stop_event = None
with self.frame_lock:
self.latest_color_frame = None
self.latest_depth_frame = None
self.latest_timestamp = None
self.new_frame_event.clear()
# NOTE(Steven): Missing implementation for depth for now
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
"""
@@ -506,6 +541,7 @@ class RealSenseCamera(Camera):
This method retrieves the most recent color frame captured by the background
read thread. It does not block waiting for the camera hardware directly,
but may wait up to timeout_ms for the background thread to provide a frame.
It is best effort under high FPS.
Args:
timeout_ms (float): Maximum time in milliseconds to wait for a frame
@@ -524,17 +560,16 @@ class RealSenseCamera(Camera):
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
self._start_read_thread()
raise RuntimeError(f"{self} read thread is not running.")
if not self.new_frame_event.wait(timeout=timeout_ms / 1000.0):
thread_alive = self.thread is not None and self.thread.is_alive()
raise TimeoutError(
f"Timed out waiting for frame from camera {self} after {timeout_ms} ms. "
f"Read thread alive: {thread_alive}."
f"Read thread alive: {self.thread.is_alive()}."
)
with self.frame_lock:
frame = self.latest_frame
frame = self.latest_color_frame
self.new_frame_event.clear()
if frame is None:
@@ -542,6 +577,43 @@ class RealSenseCamera(Camera):
return frame
# NOTE(Steven): Missing implementation for depth for now
def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]:
"""Return the most recent (color) frame captured immediately (Peeking).
This method is non-blocking and returns whatever is currently in the
memory buffer. The frame may be stale,
meaning it could have been captured a while ago (hanging camera scenario e.g.).
Returns:
NDArray[Any]: The frame image (numpy array).
Raises:
TimeoutError: If the latest frame is older than `max_age_ms`.
DeviceNotConnectedError: If the camera is not connected.
RuntimeError: If the camera is connected but has not captured any frames yet.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
with self.frame_lock:
frame = self.latest_color_frame
timestamp = self.latest_timestamp
if frame is None or timestamp is None:
raise RuntimeError(f"{self} has not captured any frames yet.")
age_ms = (time.perf_counter() - timestamp) * 1e3
if age_ms > max_age_ms:
raise TimeoutError(
f"{self} latest frame is too old: {age_ms:.1f} ms (max allowed: {max_age_ms} ms)."
)
return frame
def disconnect(self) -> None:
"""
Disconnects from the camera, stops the pipeline, and cleans up resources.
@@ -565,4 +637,10 @@ class RealSenseCamera(Camera):
self.rs_pipeline = None
self.rs_profile = None
with self.frame_lock:
self.latest_color_frame = None
self.latest_depth_frame = None
self.latest_timestamp = None
self.new_frame_event.clear()
logger.info(f"{self} disconnected.")
+193 -41
View File
@@ -45,6 +45,12 @@ logger = logging.getLogger(__name__)
class ZMQCamera(Camera):
"""
Manages camera interactions via ZeroMQ for receiving frames from a remote server.
This class connects to a ZMQ Publisher, subscribes to frame topics, and decodes
incoming JSON messages containing Base64 encoded images. It supports both
synchronous and asynchronous frame reading patterns.
Example usage:
```python
from lerobot.cameras.zmq import ZMQCamera, ZMQCameraConfig
@@ -52,7 +58,16 @@ class ZMQCamera(Camera):
config = ZMQCameraConfig(server_address="192.168.123.164", port=5555, camera_name="head_camera")
camera = ZMQCamera(config)
camera.connect()
frame = camera.read()
# Read 1 frame synchronously (blocking)
color_image = camera.read()
# Read 1 frame asynchronously (waits for new frame with a timeout)
async_image = camera.async_read()
# Get the latest frame immediately (no wait, returns timestamp)
latest_image, timestamp = camera.read_latest()
camera.disconnect()
```
"""
@@ -68,14 +83,17 @@ class ZMQCamera(Camera):
self.color_mode = config.color_mode
self.timeout_ms = config.timeout_ms
# ZMQ Context and Socket
self.context: zmq.Context | None = None
self.socket: zmq.Socket | None = None
self._connected = False
# Threading resources
self.thread: Thread | None = None
self.stop_event: Event | None = None
self.frame_lock: Lock = Lock()
self.latest_frame: NDArray[Any] | None = None
self.latest_timestamp: float | None = None
self.new_frame_event: Event = Event()
def __str__(self) -> str:
@@ -83,10 +101,16 @@ class ZMQCamera(Camera):
@property
def is_connected(self) -> bool:
"""Checks if the ZMQ socket is initialized and connected."""
return self._connected and self.context is not None and self.socket is not None
def connect(self, warmup: bool = True) -> None:
"""Connect to ZMQ camera server."""
"""Connect to ZMQ camera server.
Args:
warmup (bool): If True, waits for the camera to provide at least one
valid frame before returning. Defaults to True.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} is already connected.")
@@ -103,18 +127,29 @@ class ZMQCamera(Camera):
self.socket.connect(f"tcp://{self.server_address}:{self.port}")
self._connected = True
# Auto-detect resolution
# Auto-detect resolution if not provided
if self.width is None or self.height is None:
h, w = self.read().shape[:2]
# Read directly from hardware because the thread isn't running yet
temp_frame = self._read_from_hardware()
h, w = temp_frame.shape[:2]
self.height = h
self.width = w
logger.info(f"{self} resolution: {w}x{h}")
logger.info(f"{self} resolution detected: {w}x{h}")
self._start_read_thread()
logger.info(f"{self} connected.")
if warmup:
# Ensure we have captured at least one frame via the thread
start_time = time.time()
while time.time() - start_time < (self.config.warmup_s): # Wait a bit more than timeout
self.async_read(timeout_ms=self.config.warmup_s * 1000)
time.sleep(0.1)
with self.frame_lock:
if self.latest_frame is None:
raise ConnectionError(f"{self} failed to capture frames during warmup.")
except Exception as e:
self._cleanup()
raise RuntimeError(f"Failed to connect to {self}: {e}") from e
@@ -134,12 +169,9 @@ class ZMQCamera(Camera):
"""ZMQ cameras require manual configuration (server address/port)."""
return []
def read(self, color_mode: ColorMode | None = None) -> NDArray[Any]:
def _read_from_hardware(self) -> NDArray[Any]:
"""
Read a single frame from the ZMQ camera.
Returns:
np.ndarray: Decoded frame (height, width, 3)
Reads a single frame directly from the ZMQ socket.
"""
if not self.is_connected or self.socket is None:
raise DeviceNotConnectedError(f"{self} is not connected.")
@@ -147,6 +179,7 @@ class ZMQCamera(Camera):
try:
message = self.socket.recv_string()
except Exception as e:
# Check for ZMQ timeout (EAGAIN/Again) without requiring global zmq import
if type(e).__name__ == "Again":
raise TimeoutError(f"{self} timeout after {self.timeout_ms}ms") from e
raise
@@ -176,42 +209,117 @@ class ZMQCamera(Camera):
return frame
def _read_loop(self) -> None:
while self.stop_event and not self.stop_event.is_set():
try:
frame = self.read()
with self.frame_lock:
self.latest_frame = frame
self.new_frame_event.set()
except DeviceNotConnectedError:
break
except TimeoutError:
pass
except Exception as e:
logger.warning(f"Read error: {e}")
def read(self, color_mode: ColorMode | None = None) -> NDArray[Any]:
"""
Reads a single frame synchronously from the camera.
def _start_read_thread(self) -> None:
if self.thread and self.thread.is_alive():
return
self.stop_event = Event()
self.thread = Thread(target=self._read_loop, daemon=True)
self.thread.start()
This is a blocking call. It waits for the next available frame from the
camera background thread.
def _stop_read_thread(self) -> None:
if self.stop_event:
self.stop_event.set()
if self.thread and self.thread.is_alive():
self.thread.join(timeout=2.0)
self.thread = None
self.stop_event = None
Returns:
np.ndarray: Decoded frame (height, width, 3)
"""
start_time = time.perf_counter()
if color_mode is not None:
logger.warning(
f"{self} read() color_mode parameter is deprecated and will be removed in future versions."
)
def async_read(self, timeout_ms: float = 10000) -> NDArray[Any]:
"""Read latest frame asynchronously (non-blocking)."""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if not self.thread or not self.thread.is_alive():
self._start_read_thread()
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
self.new_frame_event.clear()
frame = self.async_read(timeout_ms=10000)
read_duration_ms = (time.perf_counter() - start_time) * 1e3
logger.debug(f"{self} read took: {read_duration_ms:.1f}ms")
return frame
def _read_loop(self) -> None:
"""
Internal loop run by the background thread for asynchronous reading.
"""
if self.stop_event is None:
raise RuntimeError(f"{self}: stop_event is not initialized.")
failure_count = 0
while not self.stop_event.is_set():
try:
frame = self._read_from_hardware()
capture_time = time.perf_counter()
with self.frame_lock:
self.latest_frame = frame
self.latest_timestamp = capture_time
self.new_frame_event.set()
failure_count = 0
except DeviceNotConnectedError:
break
except (TimeoutError, Exception) as e:
if failure_count <= 10:
failure_count += 1
logger.warning(f"Read error: {e}")
else:
raise RuntimeError(f"{self} exceeded maximum consecutive read failures.") from e
def _start_read_thread(self) -> None:
if self.stop_event is not None:
self.stop_event.set()
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=2.0)
with self.frame_lock:
self.latest_frame = None
self.latest_timestamp = None
self.new_frame_event.clear()
self.stop_event = Event()
self.thread = Thread(target=self._read_loop, daemon=True, name=f"{self}_read_loop")
self.thread.start()
time.sleep(0.1)
def _stop_read_thread(self) -> None:
if self.stop_event is not None:
self.stop_event.set()
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=2.0)
self.thread = None
self.stop_event = None
with self.frame_lock:
self.latest_frame = None
self.latest_timestamp = None
self.new_frame_event.clear()
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
"""
Reads the latest available frame asynchronously.
Args:
timeout_ms (float): Maximum time in milliseconds to wait for a frame
to become available. Defaults to 200ms.
Returns:
np.ndarray: The latest captured frame.
Raises:
DeviceNotConnectedError: If the camera is not connected.
TimeoutError: If no frame data becomes available within the specified timeout.
RuntimeError: If the background thread is not running.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
if not self.new_frame_event.wait(timeout=timeout_ms / 1000.0):
raise TimeoutError(f"{self} async_read timeout after {timeout_ms}ms")
@@ -225,11 +333,55 @@ class ZMQCamera(Camera):
return frame
def read_latest(self, max_age_ms: int = 1000) -> NDArray[Any]:
"""Return the most recent frame captured immediately (Peeking).
This method is non-blocking and returns whatever is currently in the
memory buffer. The frame may be stale,
meaning it could have been captured a while ago (hanging camera scenario e.g.).
Returns:
NDArray[Any]: The frame image (numpy array).
Raises:
TimeoutError: If the latest frame is older than `max_age_ms`.
DeviceNotConnectedError: If the camera is not connected.
RuntimeError: If the camera is connected but has not captured any frames yet.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
with self.frame_lock:
frame = self.latest_frame
timestamp = self.latest_timestamp
if frame is None or timestamp is None:
raise RuntimeError(f"{self} has not captured any frames yet.")
age_ms = (time.perf_counter() - timestamp) * 1e3
if age_ms > max_age_ms:
raise TimeoutError(
f"{self} latest frame is too old: {age_ms:.1f} ms (max allowed: {max_age_ms} ms)."
)
return frame
def disconnect(self) -> None:
"""Disconnect from ZMQ camera."""
if not self.is_connected and not self.thread:
if not self.is_connected and self.thread is None:
raise DeviceNotConnectedError(f"{self} not connected.")
if self.thread is not None:
self._stop_read_thread()
self._cleanup()
with self.frame_lock:
self.latest_frame = None
self.latest_timestamp = None
self.new_frame_event.clear()
logger.info(f"{self} disconnected.")
@@ -29,6 +29,7 @@ class ZMQCameraConfig(CameraConfig):
camera_name: str = "zmq_camera"
color_mode: ColorMode = ColorMode.RGB
timeout_ms: int = 5000
warmup_s: int = 1
def __post_init__(self) -> None:
if self.color_mode not in (ColorMode.RGB, ColorMode.BGR):
+3
View File
@@ -86,7 +86,10 @@ def calibrate(cfg: CalibrateConfig):
device = make_teleoperator_from_config(cfg.device)
device.connect(calibrate=False)
try:
device.calibrate()
finally:
device.disconnect()
+2 -1
View File
@@ -110,6 +110,7 @@ def replay(cfg: ReplayConfig):
robot.connect()
try:
log_say("Replaying episode", cfg.play_sounds, blocking=True)
for idx in range(len(episode_frames)):
start_episode_t = time.perf_counter()
@@ -127,7 +128,7 @@ def replay(cfg: ReplayConfig):
dt_s = time.perf_counter() - start_episode_t
precise_sleep(max(1 / dataset.fps - dt_s, 0.0))
finally:
robot.disconnect()
+110 -40
View File
@@ -20,7 +20,9 @@
# ```
from pathlib import Path
from unittest.mock import patch
import cv2
import numpy as np
import pytest
@@ -28,6 +30,50 @@ from lerobot.cameras.configs import Cv2Rotation
from lerobot.cameras.opencv import OpenCVCamera, OpenCVCameraConfig
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
RealVideoCapture = cv2.VideoCapture
class MockLoopingVideoCapture:
"""
Wraps the real OpenCV VideoCapture.
Motivation: cv2.VideoCapture(file.png) is only valid for one read.
Strategy: Read the file once & return the cached frame for subsequent reads.
Consequence: No recurrent I/O operations, but we keep the test artifacts simple.
"""
def __init__(self, *args, **kwargs):
args_clean = [str(a) if isinstance(a, Path) else a for a in args]
self._real_vc = RealVideoCapture(*args_clean, **kwargs)
self._cached_frame = None
def read(self):
ret, frame = self._real_vc.read()
if ret:
self._cached_frame = frame
return ret, frame
if not ret and self._cached_frame is not None:
return True, self._cached_frame.copy()
return ret, frame
def __getattr__(self, name):
return getattr(self._real_vc, name)
@pytest.fixture(autouse=True)
def patch_opencv_videocapture():
"""
Automatically patches cv2.VideoCapture for all tests.
"""
module_path = OpenCVCamera.__module__
target = f"{module_path}.cv2.VideoCapture"
with patch(target, new=MockLoopingVideoCapture):
yield
# NOTE(Steven): more tests + assertions?
TEST_ARTIFACTS_DIR = Path(__file__).parent.parent / "artifacts" / "cameras"
DEFAULT_PNG_FILE_PATH = TEST_ARTIFACTS_DIR / "image_160x120.png"
@@ -43,25 +89,22 @@ def test_abc_implementation():
def test_connect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
camera.connect(warmup=False)
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0)
with OpenCVCamera(config) as camera:
assert camera.is_connected
def test_connect_already_connected():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
camera.connect(warmup=False)
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0)
with pytest.raises(DeviceAlreadyConnectedError):
camera.connect(warmup=False)
with OpenCVCamera(config) as camera, pytest.raises(DeviceAlreadyConnectedError):
camera.connect()
def test_connect_invalid_camera_path():
config = OpenCVCameraConfig(index_or_path="nonexistent/camera.png")
camera = OpenCVCamera(config)
with pytest.raises(ConnectionError):
@@ -74,27 +117,25 @@ def test_invalid_width_connect():
width=99999, # Invalid width to trigger error
height=480,
)
camera = OpenCVCamera(config)
camera = OpenCVCamera(config)
with pytest.raises(RuntimeError):
camera.connect(warmup=False)
@pytest.mark.parametrize("index_or_path", TEST_IMAGE_PATHS, ids=TEST_IMAGE_SIZES)
def test_read(index_or_path):
config = OpenCVCameraConfig(index_or_path=index_or_path)
camera = OpenCVCamera(config)
camera.connect(warmup=False)
config = OpenCVCameraConfig(index_or_path=index_or_path, warmup_s=0)
with OpenCVCamera(config) as camera:
img = camera.read()
assert isinstance(img, np.ndarray)
def test_read_before_connect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
camera = OpenCVCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.read()
@@ -119,32 +160,22 @@ def test_disconnect_before_connect():
@pytest.mark.parametrize("index_or_path", TEST_IMAGE_PATHS, ids=TEST_IMAGE_SIZES)
def test_async_read(index_or_path):
config = OpenCVCameraConfig(index_or_path=index_or_path)
camera = OpenCVCamera(config)
camera.connect(warmup=False)
config = OpenCVCameraConfig(index_or_path=index_or_path, warmup_s=0)
try:
with OpenCVCamera(config) as camera:
img = camera.async_read()
assert camera.thread is not None
assert camera.thread.is_alive()
assert isinstance(img, np.ndarray)
finally:
if camera.is_connected:
camera.disconnect() # To stop/join the thread. Otherwise get warnings when the test ends
def test_async_read_timeout():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
camera.connect(warmup=False)
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0)
try:
with pytest.raises(TimeoutError):
camera.async_read(timeout_ms=0)
finally:
if camera.is_connected:
camera.disconnect()
with OpenCVCamera(config) as camera, pytest.raises(TimeoutError):
camera.async_read(timeout_ms=0) # consumes any available frame by then
camera.async_read(timeout_ms=0) # request immediately another one
def test_async_read_before_connect():
@@ -155,6 +186,50 @@ def test_async_read_before_connect():
_ = camera.async_read()
def test_read_latest():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0)
with OpenCVCamera(config) as camera:
# ensure at least one fresh frame is captured
frame = camera.read()
latest = camera.read_latest()
assert isinstance(latest, np.ndarray)
assert latest.shape == frame.shape
def test_read_latest_before_connect():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH)
camera = OpenCVCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.read_latest()
def test_read_latest_high_frequency():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0)
with OpenCVCamera(config) as camera:
# prime to ensure frames are available
ref = camera.read()
for _ in range(20):
latest = camera.read_latest()
assert isinstance(latest, np.ndarray)
assert latest.shape == ref.shape
def test_read_latest_too_old():
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, warmup_s=0)
with OpenCVCamera(config) as camera:
# prime to ensure frames are available
_ = camera.read()
with pytest.raises(TimeoutError):
_ = camera.read_latest(max_age_ms=0) # immediately too old
def test_fourcc_configuration():
"""Test FourCC configuration validation and application."""
@@ -181,19 +256,16 @@ def test_fourcc_configuration():
def test_fourcc_with_camera():
"""Test FourCC functionality with actual camera connection."""
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, fourcc="MJPG")
camera = OpenCVCamera(config)
config = OpenCVCameraConfig(index_or_path=DEFAULT_PNG_FILE_PATH, fourcc="MJPG", warmup_s=0)
# Connect should work with MJPG specified
camera.connect(warmup=False)
with OpenCVCamera(config) as camera:
assert camera.is_connected
# Read should work normally
img = camera.read()
assert isinstance(img, np.ndarray)
camera.disconnect()
@pytest.mark.parametrize("index_or_path", TEST_IMAGE_PATHS, ids=TEST_IMAGE_SIZES)
@pytest.mark.parametrize(
@@ -211,10 +283,8 @@ def test_rotation(rotation, index_or_path):
dimensions = filename.split("_")[-1].split(".")[0] # Assumes filenames format (_wxh.png)
original_width, original_height = map(int, dimensions.split("x"))
config = OpenCVCameraConfig(index_or_path=index_or_path, rotation=rotation)
camera = OpenCVCamera(config)
camera.connect(warmup=False)
config = OpenCVCameraConfig(index_or_path=index_or_path, rotation=rotation, warmup_s=0)
with OpenCVCamera(config) as camera:
img = camera.read()
assert isinstance(img, np.ndarray)
+38
View File
@@ -150,6 +150,44 @@ def test_async_read_before_connect(camera):
_ = camera.async_read()
def test_read_latest(camera):
camera.connect()
frame = camera.read()
latest = camera.read_latest()
assert isinstance(latest, np.ndarray)
assert latest.shape == frame.shape
def test_read_latest_before_connect(camera):
# camera fixture yields an unconnected camera instance
with pytest.raises(DeviceNotConnectedError):
_ = camera.read_latest()
def test_read_latest_high_frequency(camera):
camera.connect()
# prime to ensure frames are available
ref = camera.read()
for _ in range(20):
latest = camera.read_latest()
assert isinstance(latest, np.ndarray)
assert latest.shape == ref.shape
def test_read_latest_too_old(camera):
camera.connect()
# prime to ensure frames are available
_ = camera.read()
with pytest.raises(TimeoutError):
_ = camera.read_latest(max_age_ms=0) # immediately too old
def test_wrong_camera_name():
with pytest.raises(ValueError):
_ = Reachy2CameraConfig(name="wrong-name", image_type="left")
+55 -33
View File
@@ -62,19 +62,15 @@ def test_abc_implementation():
def test_connect():
config = RealSenseCameraConfig(serial_number_or_name="042")
camera = RealSenseCamera(config)
config = RealSenseCameraConfig(serial_number_or_name="042", warmup_s=0)
camera.connect(warmup=False)
with RealSenseCamera(config) as camera:
assert camera.is_connected
def test_connect_already_connected():
config = RealSenseCameraConfig(serial_number_or_name="042")
camera = RealSenseCamera(config)
camera.connect(warmup=False)
with pytest.raises(DeviceAlreadyConnectedError):
config = RealSenseCameraConfig(serial_number_or_name="042", warmup_s=0)
with RealSenseCamera(config) as camera, pytest.raises(DeviceAlreadyConnectedError):
camera.connect(warmup=False)
@@ -96,10 +92,8 @@ def test_invalid_width_connect():
def test_read():
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30)
camera = RealSenseCamera(config)
camera.connect(warmup=False)
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0)
with RealSenseCamera(config) as camera:
img = camera.read()
assert isinstance(img, np.ndarray)
@@ -142,32 +136,21 @@ def test_disconnect_before_connect():
def test_async_read():
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30)
camera = RealSenseCamera(config)
camera.connect(warmup=False)
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0)
try:
with RealSenseCamera(config) as camera:
img = camera.async_read()
assert camera.thread is not None
assert camera.thread.is_alive()
assert isinstance(img, np.ndarray)
finally:
if camera.is_connected:
camera.disconnect() # To stop/join the thread. Otherwise get warnings when the test ends
def test_async_read_timeout():
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30)
camera = RealSenseCamera(config)
camera.connect(warmup=False)
try:
with pytest.raises(TimeoutError):
camera.async_read(timeout_ms=0)
finally:
if camera.is_connected:
camera.disconnect()
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0)
with RealSenseCamera(config) as camera, pytest.raises(TimeoutError):
camera.async_read(timeout_ms=0) # consumes any available frame by then
camera.async_read(timeout_ms=0) # request immediately another one
def test_async_read_before_connect():
@@ -178,6 +161,47 @@ def test_async_read_before_connect():
_ = camera.async_read()
def test_read_latest():
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0)
with RealSenseCamera(config) as camera:
img = camera.read()
latest = camera.read_latest()
assert isinstance(latest, np.ndarray)
assert latest.shape == img.shape
def test_read_latest_high_frequency():
config = RealSenseCameraConfig(serial_number_or_name="042", width=640, height=480, fps=30, warmup_s=0)
with RealSenseCamera(config) as camera:
# prime with one read to ensure frames are available
ref = camera.read()
for _ in range(20):
latest = camera.read_latest()
assert isinstance(latest, np.ndarray)
assert latest.shape == ref.shape
def test_read_latest_before_connect():
config = RealSenseCameraConfig(serial_number_or_name="042")
camera = RealSenseCamera(config)
with pytest.raises(DeviceNotConnectedError):
_ = camera.read_latest()
def test_read_latest_too_old():
config = RealSenseCameraConfig(serial_number_or_name="042")
with RealSenseCamera(config) as camera:
# prime to ensure frames are available
_ = camera.read()
with pytest.raises(TimeoutError):
_ = camera.read_latest(max_age_ms=0) # immediately too old
@pytest.mark.parametrize(
"rotation",
[
@@ -189,10 +213,8 @@ def test_async_read_before_connect():
ids=["no_rot", "rot90", "rot180", "rot270"],
)
def test_rotation(rotation):
config = RealSenseCameraConfig(serial_number_or_name="042", rotation=rotation)
camera = RealSenseCamera(config)
camera.connect(warmup=False)
config = RealSenseCameraConfig(serial_number_or_name="042", rotation=rotation, warmup_s=0)
with RealSenseCamera(config) as camera:
img = camera.read()
assert isinstance(img, np.ndarray)