Compare commits

...

48 Commits

Author SHA1 Message Date
CarolinePascal ae7e446afd docs(mermaid): fixing mermaid diagram 2026-06-12 20:01:16 +02:00
CarolinePascal f649a3b452 fix(pyav check): fixing PyAV option validation for integer codec options by normalizing
numeric values before calling `is_integer()`

Co-authored-by: Wensi (Vince) Ai <59036629+wensi-ai@users.noreply.github.com>
2026-06-12 19:52:45 +02:00
CarolinePascal 5461fadc86 chore(format): formatting code 2026-06-12 19:52:10 +02:00
CarolinePascal e0f08379e6 test(dataset tools): adding missing tests for new dataset edition tools features 2026-06-12 19:46:36 +02:00
CarolinePascal f63e9682b2 test(fix): fixing depth tests 2026-06-12 19:45:38 +02:00
CarolinePascal 5c71fdf5f4 docs(depth): improving depth maps docs 2026-06-12 19:45:19 +02:00
CarolinePascal bf2a3f3a39 chore(format): formatting code 2026-06-12 19:18:47 +02:00
CarolinePascal 6b2d5cec4c test(depth encoding): updating and cleaning video/depth encoding tests 2026-06-12 19:17:59 +02:00
CarolinePascal 291d2e982c test(depth): cleaning up depth tests 2026-06-12 19:00:17 +02:00
CarolinePascal 979f7b1187 feat(output unit): adding support for output unit specification at dataset reading/training time
Co-authored-by: Wensi (Vince) Ai <59036629+wensi-ai@users.noreply.github.com>
2026-06-12 18:34:46 +02:00
CarolinePascal 1a5fbb216f fix(depth units): fixing depth units output for the realsense cameras 2026-06-12 18:28:31 +02:00
CarolinePascal 65d4da7c1b fix(is_depth): adding missing doctrings and is_depth arguments in video decoding functions
Co-authored-by: Wensi (Vince) Ai <59036629+wensi-ai@users.noreply.github.com>
2026-06-12 18:04:45 +02:00
CarolinePascal 6610819c4a fix(typo): fixing typo 2026-05-26 18:06:52 +02:00
CarolinePascal 6ab5e667c0 fix(from_video_info): fixing early validation issue in from_video_info 2026-05-26 18:06:22 +02:00
CarolinePascal 55ff277af9 test(cleaning): cleaning up tests 2026-05-26 18:05:31 +02:00
CarolinePascal 71546887c5 test(aggregate): extending aggregation tests to depth frames 2026-05-26 17:03:30 +02:00
CarolinePascal 1cc9fe0139 feat(tools): adding depth support in LeRobotDataset edition tools 2026-05-26 17:00:12 +02:00
CarolinePascal f1859d8c65 feat(batched dequantization): optimizing dequantize_depth for torch based batched dequantization 2026-05-26 16:58:44 +02:00
CarolinePascal 3d67784f8c fix(TIFF): add missing quantization and cleanup for TIFF files 2026-05-26 15:27:09 +02:00
CarolinePascal 621f4191be fix(typo): fixing typo 2026-05-26 15:27:04 +02:00
CarolinePascal de44bc9711 fix(normalization): restricting 255 normalization to non depth/uint8 images only 2026-05-26 14:17:03 +02:00
CarolinePascal bb47f5c463 fix(realsense): fixing typo in realsense serial number 2026-05-26 14:05:17 +02:00
CarolinePascal e06d83fbfd tests(typos): fixing typos in tests 2026-05-24 23:50:30 +02:00
CarolinePascal 6179b5fd7e fix(info): fixing info metadata update when is_depth_map was set 2026-05-24 23:37:08 +02:00
CarolinePascal 72ce8b099c fix(pre-commit): fixing mutable defautl value 2026-05-24 23:37:08 +02:00
CarolinePascal 2efa8e5540 feat(refactor): refactor DepthEncoderConfig quantization pipeline, so that the methods do not live in the config class. Add pixel format - channels validation.Move the default pixel format for depth in the config file. 2026-05-24 23:37:08 +02:00
CarolinePascal aff1252b90 feat(pix_fmt channels): use PyAv to check get pixel formats number of channels 2026-05-24 23:37:08 +02:00
CarolinePascal bf78fb11d3 tests(depth): adding new tests for depth integration validation 2026-05-24 23:37:08 +02:00
CarolinePascal d8eea83a89 test(fix): fixing exisiting tests to still work with latest features 2026-05-24 23:37:08 +02:00
CarolinePascal b183571586 chore(typos): fixing typos 2026-05-24 23:37:08 +02:00
CarolinePascal 6bf4ffaabc fix(plumbing): fixing missing parts in the depth maps pipeline 2026-05-24 23:37:07 +02:00
CarolinePascal 45683aeccc fix(stop_event): fixing stop_event race condition in camera classes 2026-05-24 23:37:07 +02:00
CarolinePascal 5b91fe5d1a feat(is_depth): simplifying is_depth nested name + legacy support 2026-05-24 23:37:07 +02:00
CarolinePascal ca9fad1f12 feat(depth shape): ensuring depth maps shape is always including the channel 2026-05-24 23:37:07 +02:00
CarolinePascal efdec0ed3a chore(format): format code 2026-05-24 23:37:07 +02:00
CarolinePascal 1ba832e6e2 feat(depth maps writer): adding support for raw depth maps recording with image writer 2026-05-24 23:37:07 +02:00
CarolinePascal 44280c4c56 feat(viz): render depth observations as rr.DepthImage in Viridis 2026-05-24 23:37:07 +02:00
CarolinePascal 8825bb77e4 feat(record): plumb DepthEncoderConfig through lerobot-record 2026-05-24 23:37:07 +02:00
CarolinePascal ab6911b781 feat(robots/so_follower): emit + populate depth keys when use_depth 2026-05-24 23:37:07 +02:00
CarolinePascal 929113c0d7 feat(features): route 2D camera shapes to observation.depth.<key> 2026-05-24 23:37:06 +02:00
CarolinePascal b4e7d4c63a feat(cameras/realsense): expose async depth in metric meters 2026-05-24 23:37:06 +02:00
CarolinePascal 3f1b6f1ce1 feat(depth): wire DatasetReader to decode_depth_frames 2026-05-24 23:37:06 +02:00
CarolinePascal 226efd51f2 feat(depth): wire StreamingVideoEncoder + writer to depth encoder 2026-05-24 23:37:06 +02:00
CarolinePascal c49c7536f2 feat(depth): plumb DepthEncoderConfig through LeRobotDataset and DatasetWriter 2026-05-24 23:37:06 +02:00
CarolinePascal 93d1fac4d1 feat(depth): extend quantization tools to better fit the encoding/decoding pipeline 2026-05-24 23:37:06 +02:00
CarolinePascal 01081d2156 feat(depth): persist depth metadata 2026-05-24 23:37:06 +02:00
CarolinePascal 119169e417 feat(video): add ffv1 to supported codecs 2026-05-24 23:37:06 +02:00
CarolinePascal 7402980d7d feat(depth): add depth quantization helpers and tests 2026-05-24 23:37:05 +02:00
40 changed files with 1864 additions and 275 deletions
+8
View File
@@ -157,6 +157,14 @@ finally:
</hfoption>
</hfoptions>
### Working with depth
The Intel RealSense and Reachy 2 cameras can capture both color and depth in lockstep. Calling `read()` returns the **color** frame as `(H, W, 3)` `uint8`. Calling `read_depth()` returns the **depth map** as `(H, W, 1)` `uint16`, where each pixel value is the distance from the sensor expressed in **millimetres**. A pixel value of `0` typically means "no measurement available" (out-of-range, occluded, or low-confidence).
During recording, the control loop peeks the freshest buffered frames non-blockingly via `read_latest()` (color) and `read_latest_depth()` (depth), adding the depth map as a sibling feature (e.g. `front_depth` next to `front`).
For how depth streams are stored and encoded when recording a dataset, see the [Depth streams](./video_encoding_parameters#depth-streams) section of the video encoding guide.
## Use your phone's camera
<hfoptions id="use phone">
+72 -2
View File
@@ -65,6 +65,76 @@ All flags below are prefixed with `--dataset.camera_encoder.` on the CLI.
---
## Depth streams
Depth maps (Intel RealSense, Reachy 2) are stored as their **own video streams** alongside the RGB streams. Raw depth (`uint16` millimetres or `float32` metres) can't survive an 8-bit codec, so LeRobot **quantizes** each map to a 12-bit code (`[0, 4095]`) — logarithmically by default, to match the `1/depth` error profile of depth sensors — then packs it into a high-bit-depth pixel format (`gray12le`) and encodes it with a 12-bit codec.
```mermaid
flowchart LR
A["Raw depth (uint16 mm / float32 m)"] --> B["Clip to depth_min, depth_max"]
B --> C["Quantize to 12-bit code 04095 (log or linear)"]
C --> D["Pack into gray12le"]
D --> E["Encode video (hevc Main 12)"]
E --> F[("MP4 + metadata: depth_min/max, shift, use_log")]
F -. "load time (depth_output_unit)" .-> G["Dequantize to mm or m"]
classDef input fill:#e3f2fd,stroke:#1565c0,color:#0d47a1;
classDef encode fill:#ede7f6,stroke:#5e35b1,color:#311b92;
classDef store fill:#fff8e1,stroke:#f9a825,color:#e65100;
classDef load fill:#e8f5e9,stroke:#2e7d32,color:#1b5e20;
class A input;
class B,C,D,E encode;
class F store;
class G load;
```
Configure the depth pipeline through a parallel **`depth_encoder`** block (`DepthEncoderConfig`). It inherits every `VideoEncoderConfig` field (`vcodec`, `pix_fmt`, `crf`, …) and adds four quantizer knobs, set via `--dataset.depth_encoder.<field>`:
```bash
lerobot-record \
... \
--dataset.depth_encoder.vcodec=hevc \
--dataset.depth_encoder.depth_min=0.05 \
--dataset.depth_encoder.depth_max=5.0 \
--dataset.depth_encoder.use_log=true
```
| Parameter | Type | Default | Description |
| ----------- | ------- | ------------ | --------------------------------------------------------------------------------------------------------------------------------- |
| `vcodec` | `str` | `"hevc"` | Defaults to HEVC Main 12 (a 12-bit-capable codec). `ffv1` is a lossless alternative. |
| `pix_fmt` | `str` | `"gray12le"` | Single-channel 12-bit pixel format used to carry the quantized codes. |
| `depth_min` | `float` | `0.01` | Depth in metres mapped to quantum `0`. Values below are clipped on decode. |
| `depth_max` | `float` | `10.0` | Depth in metres mapped to quantum `4095`. Values above are clipped on decode. |
| `shift` | `float` | `3.5` | Pre-log offset (metres) used in logarithmic quantization for numerical stability near zero. Must satisfy `depth_min + shift > 0`. |
| `use_log` | `bool` | `True` | If `true`, quantize in log-space (recommended for typical depth sensors). Set to `false` for uniform/linear quantization. |
> [!TIP]
> `depth_min`, `depth_max`, and `shift` are always interpreted in **metres**, regardless of the input depth's unit. Inputs are auto-detected: integer arrays (e.g. `uint16` millimetres straight from a RealSense) are treated as millimetres, floating arrays as metres.
> Pick `depth_min` / `depth_max` to bracket the actual working range of your sensor — quanta outside that range saturate, which can crush detail at the boundaries.
Depth features are flagged with `"is_depth_map": true` in `meta/info.json`, and their quantizer settings (`video.depth_min`, `video.depth_max`, `video.shift`, `video.use_log`) are persisted — which is what lets depth be **dequantized back to physical units** on load.
### Output unit at load time
`depth_encoder` is a **record-time** concern. The unit that depth maps are dequantized to on _load_ (e.g. during training) is set separately by the read-time flag `--dataset.depth_output_unit`:
```bash
lerobot-train \
--dataset.repo_id=<my_username>/<my_dataset_name> \
--dataset.depth_output_unit=m \
--policy.type=act
```
| Parameter | Type | Default | Description |
| ------------------- | ----- | ------- | -------------------------------------------------------------------------------------------- |
| `depth_output_unit` | `str` | `"mm"` | Physical unit depth maps are dequantized to on load: `"mm"` (millimetres) or `"m"` (metres). |
> [!TIP]
> This is purely a decode-time presentation choice — it does **not** alter the stored video or its metadata, so the same dataset can be read as `mm` or `m` without re-encoding. It has no effect on datasets without depth cameras.
---
## Persistence in dataset metadata
After the first episode of a video stream is encoded, the encoder configuration is **persisted into the dataset metadata** (`meta/info.json`) under each video feature, alongside the values probed from the file itself. For a video feature `observation.images.<camera>`, the layout in `info.json` is:
@@ -82,7 +152,7 @@ After the first episode of a video stream is encoded, the encoder configuration
"video.pix_fmt": "yuv420p",
"video.fps": 30,
"video.channels": 3,
"video.is_depth_map": false,
"is_depth_map": false,
"video.g": 2,
"video.crf": 30,
"video.preset": "fast",
@@ -97,7 +167,7 @@ After the first episode of a video stream is encoded, the encoder configuration
Two sources contribute to the `info` block:
- **Stream-derived** (read back from the encoded MP4 with PyAV): `video.height`, `video.width`, `video.codec`, `video.pix_fmt`, `video.fps`, `video.channels`, `video.is_depth_map`, plus `audio.*` if an audio stream is present.
- **Stream-derived** (read back from the encoded MP4 with PyAV): `video.height`, `video.width`, `video.codec`, `video.pix_fmt`, `video.fps`, `video.channels`, `is_depth_map`, plus `audio.*` if an audio stream is present.
- **Encoder-derived** (taken from `VideoEncoderConfig`): `video.g`, `video.crf`, `video.preset`, `video.fast_decode`, `video.video_backend`, `video.extra_options`.
<Tip>
+3 -2
View File
@@ -105,8 +105,9 @@ def raw_observation_to_observation(
def prepare_image(image: torch.Tensor) -> torch.Tensor:
"""Minimal preprocessing to turn int8 images to float32 in [0, 1], and create a memory-contiguous tensor"""
image = image.type(torch.float32) / 255
"""Minimal preprocessing to turn RGB uint8 images to float32 in [0, 1], and create a memory-contiguous tensor"""
if image.dtype == torch.uint8:
image = image.type(torch.float32) / 255
image = image.contiguous()
return image
+5 -2
View File
@@ -436,7 +436,7 @@ class OpenCVCamera(Camera):
Internal loop run by the background thread for asynchronous reading.
On each iteration:
1. Reads a color frame
1. Reads a color frame (blocking call)
2. Stores result in latest_frame and updates timestamp (thread-safe)
3. Sets new_frame_event to notify listeners
@@ -445,8 +445,9 @@ class OpenCVCamera(Camera):
if self.stop_event is None:
raise RuntimeError(f"{self}: stop_event is not initialized before starting read loop.")
stop_event = self.stop_event
failure_count = 0
while not self.stop_event.is_set():
while not stop_event.is_set():
try:
raw_frame = self._read_from_hardware()
processed_frame = self._postprocess_image(raw_frame)
@@ -484,6 +485,8 @@ class OpenCVCamera(Camera):
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=2.0)
if self.thread.is_alive():
logger.warning(f"{self} read thread did not terminate within timeout.")
self.thread = None
self.stop_event = None
@@ -268,13 +268,13 @@ class RealSenseCamera(Camera):
)
if len(found_devices) > 1:
serial_numbers = [dev["serial_number"] for dev in found_devices]
serial_numbers = [dev["id"] for dev in found_devices]
raise ValueError(
f"Multiple RealSense cameras found with name '{name}'. "
f"Please use a unique serial number instead. Found SNs: {serial_numbers}"
)
serial_number = str(found_devices[0]["serial_number"])
serial_number = str(found_devices[0]["id"])
return serial_number
def _configure_rs_pipeline_config(self, rs_config: Any) -> None:
@@ -332,8 +332,8 @@ class RealSenseCamera(Camera):
from the camera hardware via the RealSense pipeline.
Returns:
np.ndarray: The depth map as a NumPy array (height, width)
of type `np.uint16` (raw depth values in millimeters) and rotation.
np.ndarray: The depth map as a NumPy array (height, width, 1)
of type `np.uint16` (raw depth values in millimeters).
Raises:
DeviceNotConnectedError: If the camera is not connected.
@@ -465,8 +465,8 @@ class RealSenseCamera(Camera):
Internal loop run by the background thread for asynchronous reading.
On each iteration:
1. Reads a color frame with 500ms timeout
2. Stores result in latest_frame and updates timestamp (thread-safe)
1. Reads a color/depth frame (blocking call with 10s timeout)
2. Stores result in latest_color_frame/latest_depth_frame and updates timestamp (thread-safe)
3. Sets new_frame_event to notify listeners
Stops on DeviceNotConnectedError, logs other errors and continues.
@@ -474,8 +474,9 @@ class RealSenseCamera(Camera):
if self.stop_event is None:
raise RuntimeError(f"{self}: stop_event is not initialized before starting read loop.")
stop_event = self.stop_event
failure_count = 0
while not self.stop_event.is_set():
while not stop_event.is_set():
try:
frame = self._read_from_hardware()
color_frame_raw = frame.get_color_frame()
@@ -486,6 +487,8 @@ class RealSenseCamera(Camera):
depth_frame_raw = frame.get_depth_frame()
depth_frame = np.asanyarray(depth_frame_raw.get_data())
processed_depth_frame = self._postprocess_image(depth_frame, depth_frame=True)
if processed_depth_frame.ndim == 2: # (H, W) -> (H, W, 1)
processed_depth_frame = processed_depth_frame[..., np.newaxis]
capture_time = time.perf_counter()
@@ -522,6 +525,8 @@ class RealSenseCamera(Camera):
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=2.0)
if self.thread.is_alive(): # pragma: no cover
logger.warning(f"{self} read thread did not terminate within timeout.")
self.thread = None
self.stop_event = None
@@ -532,7 +537,6 @@ class RealSenseCamera(Camera):
self.latest_timestamp = None
self.new_frame_event.clear()
# NOTE(Steven): Missing implementation for depth for now
@check_if_not_connected
def async_read(self, timeout_ms: float = 200) -> NDArray[Any]:
"""
@@ -575,7 +579,6 @@ class RealSenseCamera(Camera):
return frame
# NOTE(Steven): Missing implementation for depth for now
@check_if_not_connected
def read_latest(self, max_age_ms: int = 500) -> NDArray[Any]:
"""Return the most recent (color) frame captured immediately (Peeking).
@@ -611,6 +614,73 @@ class RealSenseCamera(Camera):
return frame
@check_if_not_connected
def async_read_depth(self, timeout_ms: float = 200) -> NDArray[np.uint16]:
"""Read the latest depth frame asynchronously, in millimeters.
Mirrors :meth:`async_read` but returns the depth stream rather than the
color stream. Output is ``np.uint16`` of shape ``(H, W, 1)``, where each
pixel is the distance from the sensor in millimeters.
Raises:
DeviceNotConnectedError: If the camera is not connected.
RuntimeError: If ``use_depth`` is ``False`` for this camera, or if
the background read thread is not running.
TimeoutError: If no frame becomes available within ``timeout_ms``.
"""
if not self.use_depth:
raise RuntimeError(f"{self}: cannot read depth — camera was configured with use_depth=False.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
if not self.new_frame_event.wait(timeout=timeout_ms / 1000.0):
raise TimeoutError(f"Timed out waiting for depth frame from camera {self} after {timeout_ms} ms.")
with self.frame_lock:
depth_frame = self.latest_depth_frame
self.new_frame_event.clear()
if depth_frame is None:
raise RuntimeError(f"Internal error: Event set but no depth frame available for {self}.")
return depth_frame
@check_if_not_connected
def read_latest_depth(self, max_age_ms: int = 500) -> NDArray[Any]:
"""Return the most recent depth frame in millimeters (peeking).
Non-blocking counterpart of :meth:`read_latest` for the depth stream.
Output is ``np.uint16`` of shape ``(H, W, 1)``, where each pixel is the
distance from the sensor in millimeters.
Raises:
DeviceNotConnectedError: If the camera is not connected.
RuntimeError: If ``use_depth`` is ``False`` for this camera, or if
no depth frame has been captured yet.
TimeoutError: If the latest depth frame is older than ``max_age_ms``.
"""
if not self.use_depth:
raise RuntimeError(f"{self}: cannot read depth — camera was configured with use_depth=False.")
if self.thread is None or not self.thread.is_alive():
raise RuntimeError(f"{self} read thread is not running.")
with self.frame_lock:
depth_frame = self.latest_depth_frame
timestamp = self.latest_timestamp
if depth_frame is None or timestamp is None:
raise RuntimeError(f"{self} has not captured any depth frames yet.")
age_ms = (time.perf_counter() - timestamp) * 1e3
if age_ms > max_age_ms:
raise TimeoutError(
f"{self} latest depth frame is too old: {age_ms:.1f} ms (max allowed: {max_age_ms} ms)."
)
return depth_frame
def disconnect(self) -> None:
"""
Disconnects from the camera, stops the pipeline, and cleans up resources.
+4 -1
View File
@@ -249,8 +249,9 @@ class ZMQCamera(Camera):
if self.stop_event is None:
raise RuntimeError(f"{self}: stop_event is not initialized.")
stop_event = self.stop_event
failure_count = 0
while not self.stop_event.is_set():
while not stop_event.is_set():
try:
frame = self._read_from_hardware()
capture_time = time.perf_counter()
@@ -292,6 +293,8 @@ class ZMQCamera(Camera):
if self.thread is not None and self.thread.is_alive():
self.thread.join(timeout=2.0)
if self.thread.is_alive():
logger.warning(f"{self} read thread did not terminate within timeout.")
self.thread = None
self.stop_event = None
+7
View File
@@ -35,8 +35,11 @@ from .types import (
from .video import (
VALID_VIDEO_CODECS,
VIDEO_ENCODER_INFO_KEYS,
DepthEncoderConfig,
VideoEncoderConfig,
camera_encoder_defaults,
depth_encoder_defaults,
encoder_config_from_video_info,
)
__all__ = [
@@ -57,8 +60,12 @@ __all__ = [
"WandBConfig",
"load_recipe",
"VideoEncoderConfig",
"DepthEncoderConfig",
# Defaults
"camera_encoder_defaults",
"depth_encoder_defaults",
# Factories
"encoder_config_from_video_info",
# Constants
"VALID_VIDEO_CODECS",
"VIDEO_ENCODER_INFO_KEYS",
+3 -1
View File
@@ -18,7 +18,7 @@ from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from .video import VideoEncoderConfig, camera_encoder_defaults
from .video import DepthEncoderConfig, VideoEncoderConfig, camera_encoder_defaults, depth_encoder_defaults
@dataclass
@@ -60,6 +60,8 @@ class DatasetRecordConfig:
# Video encoder settings for camera MP4s (codec, quality, GOP, etc.). Tuned via CLI nested keys,
# e.g. ``--dataset.camera_encoder.vcodec=h264`` (see ``VideoEncoderConfig``).
camera_encoder: VideoEncoderConfig = field(default_factory=camera_encoder_defaults)
# Video encoder settings for depth-map MP4s (codec, quality, GOP, etc.). Tuned via CLI nested keys.
depth_encoder: DepthEncoderConfig = field(default_factory=depth_encoder_defaults)
# Enable streaming video encoding: encode frames in real-time during capture instead
# of writing PNG images first. Makes save_episode() near-instant. More info in the documentation: https://huggingface.co/docs/lerobot/streaming_video_encoding
streaming_encoding: bool = False
+6 -1
View File
@@ -35,12 +35,17 @@ class DatasetConfig:
revision: str | None = None
use_imagenet_stats: bool = True
video_backend: str = field(default_factory=get_safe_default_video_backend)
# When True, video frames are returned as uint8 tensors (0-255) instead of float32 (0.0-1.0).
# When True, RGB video frames are returned as uint8 tensors (0-255) instead of float32 (0.0-1.0).
# This reduces memory and speeds up DataLoader IPC. The training pipeline handles the conversion.
return_uint8: bool = False
# Physical unit depth maps are dequantized to at load time: "mm" (millimetres) or "m" (metres).
# Has no effect on datasets without depth cameras.
depth_output_unit: str = "mm"
streaming: bool = False
def __post_init__(self) -> None:
if self.depth_output_unit not in ("m", "mm"):
raise ValueError(f"depth_output_unit must be 'm' or 'mm', got {self.depth_output_unit!r}")
if self.episodes is not None:
if any(ep < 0 for ep in self.episodes):
raise ValueError(
+112 -8
View File
@@ -20,7 +20,7 @@ from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Any
from typing import Any, ClassVar, Self
from lerobot.utils.import_utils import require_package
@@ -36,11 +36,12 @@ HW_VIDEO_CODECS = [
"h264_vaapi", # Linux Intel/AMD
"h264_qsv", # Intel Quick Sync
]
VALID_VIDEO_CODECS: frozenset[str] = frozenset({"h264", "hevc", "libsvtav1", "auto", *HW_VIDEO_CODECS})
VALID_VIDEO_CODECS: frozenset[str] = frozenset(
{"h264", "hevc", "libsvtav1", "ffv1", "auto", *HW_VIDEO_CODECS}
)
# Aliases for legacy video codec names.
VIDEO_CODECS_ALIASES: dict[str, str] = {"av1": "libsvtav1"}
LIBSVTAV1_DEFAULT_PRESET: int = 12
# Keys persisted under ``features[*]["info"]`` as ``video.<name>`` (from :class:`VideoEncoderConfig`).
@@ -52,6 +53,19 @@ VIDEO_ENCODER_INFO_KEYS: frozenset[str] = frozenset(
f"video.{name}" for name in VIDEO_ENCODER_INFO_FIELD_NAMES
)
# Default depth quantization and encoding parameters.
DEPTH_QUANT_BITS: int = 12
DEPTH_QMAX: int = (1 << DEPTH_QUANT_BITS) - 1 # 4095
DEFAULT_DEPTH_MIN: float = 0.01
DEFAULT_DEPTH_MAX: float = 10.0
DEFAULT_DEPTH_SHIFT: float = 3.5
DEFAULT_DEPTH_USE_LOG: bool = True
DEFAULT_DEPTH_PIX_FMT: str = "gray12le"
# Depth-specific tuning fields persisted under ``features[*]["info"]`` as ``video.<name>``.
DEPTH_ENCODER_INFO_FIELD_NAMES: frozenset[str] = frozenset({"depth_min", "depth_max", "shift", "use_log"})
@dataclass
class VideoEncoderConfig:
@@ -86,6 +100,10 @@ class VideoEncoderConfig:
video_backend: str = "pyav"
extra_options: dict[str, Any] = field(default_factory=dict)
# Source-data channel count this encoder is expected to handle (3 for RGB,
# 1 for depth, etc.)
_DEFAULT_CHANNELS: ClassVar[int] = 3
def __post_init__(self) -> None:
self.resolve_vcodec()
# Empty-constructor ergonomics: ``VideoEncoderConfig()`` must "just work".
@@ -94,9 +112,9 @@ class VideoEncoderConfig:
self.validate()
@classmethod
def from_video_info(cls, video_info: dict | None) -> VideoEncoderConfig:
"""Reconstruct a :class:`VideoEncoderConfig` from a video feature's ``info`` block.
Missing or ``None`` values fall back to the class defaults.
def _kwargs_from_video_info(cls, video_info: dict | None) -> dict[str, Any]:
"""Parse the ``video.*`` keys of a feature ``info`` block into
constructor kwargs.
"""
video_info = video_info or {}
kwargs: dict[str, Any] = {}
@@ -115,7 +133,15 @@ class VideoEncoderConfig:
continue
kwargs[field_name] = value
return cls(**kwargs)
return kwargs
@classmethod
def from_video_info(cls, video_info: dict | None) -> Self:
"""Reconstruct an encoder config from a video feature's ``info`` block.
Missing or ``None`` values fall back to the class defaults.
"""
return cls(**cls._kwargs_from_video_info(video_info))
def detect_available_encoders(self, encoders: list[str] | str) -> list[str]:
"""Return the subset of available encoders based on the specified video backend.
@@ -138,7 +164,9 @@ class VideoEncoderConfig:
require_package("av", extra="dataset")
from lerobot.datasets import check_video_encoder_parameters_pyav
check_video_encoder_parameters_pyav(self.vcodec, self.pix_fmt, self.get_codec_options())
check_video_encoder_parameters_pyav(
self.vcodec, self.pix_fmt, self.get_codec_options(), channels=self._DEFAULT_CHANNELS
)
def resolve_vcodec(self) -> None:
"""Check ``vcodec`` and, when it is ``"auto"``, pick a concrete encoder.
@@ -218,6 +246,10 @@ class VideoEncoderConfig:
elif self.vcodec == "h264_qsv":
set_if("global_quality", self.crf)
set_if("preset", self.preset)
elif self.vcodec == "ffv1":
# Lossless intra-frame codec. ``crf``/``preset``/``fast_decode``
# are not meaningful.
set_if("threads", encoder_threads)
else:
set_if("crf", self.crf)
set_if("preset", self.preset)
@@ -233,3 +265,75 @@ class VideoEncoderConfig:
def camera_encoder_defaults() -> VideoEncoderConfig:
"""Return a :class:`VideoEncoderConfig` with RGB-camera defaults."""
return VideoEncoderConfig()
@dataclass
class DepthEncoderConfig(VideoEncoderConfig):
"""Encoder configuration for depth-map streams.
Inherits the full :class:`VideoEncoderConfig` surface (codec, GOP, CRF,
preset, ``extra_options``) and adds the four parameters of the depth
quantizer.
Defaults flip ``vcodec`` to ``"hevc"`` (Main 12 profile) and ``pix_fmt``
to ``"gray12le"``.
Attributes:
depth_min: Minimum depth in physical units (e.g. metres) represented
by quantum ``0``.
depth_max: Maximum depth represented by quantum :data:`DEPTH_QMAX`.
shift: Pre-log offset for numerical stability near zero.
use_log: ``True`` for logarithmic quantization (default; matches
sensor error profile), ``False`` for linear.
"""
vcodec: str = "hevc"
pix_fmt: str = "gray12le"
depth_min: float = DEFAULT_DEPTH_MIN
depth_max: float = DEFAULT_DEPTH_MAX
shift: float = DEFAULT_DEPTH_SHIFT
use_log: bool = DEFAULT_DEPTH_USE_LOG
_DEFAULT_CHANNELS: ClassVar[int] = 1
@classmethod
def _kwargs_from_video_info(cls, video_info: dict | None) -> dict[str, Any]:
"""Layer the depth-specific tuning (``depth_min`` / ``depth_max`` /
``shift`` / ``use_log``) on top of the base parser. Missing keys
fall back to the class defaults.
"""
kwargs = super()._kwargs_from_video_info(video_info)
video_info = video_info or {}
for name in DEPTH_ENCODER_INFO_FIELD_NAMES:
value = video_info.get(f"video.{name}")
if value is not None:
kwargs[name] = value
return kwargs
def depth_encoder_defaults() -> DepthEncoderConfig:
"""Return a :class:`DepthEncoderConfig` with depth-camera defaults."""
return DepthEncoderConfig()
def encoder_config_from_video_info(video_info: dict | None) -> VideoEncoderConfig:
"""Build the appropriate encoder config from a feature's ``info`` block.
Dispatches to :class:`DepthEncoderConfig` when the dict marks the feature
as a depth map and to :class:`VideoEncoderConfig`
otherwise.
Args:
video_info: A feature's ``info`` dict as persisted in ``info.json``,
or ``None`` (treated as an empty dict).
Returns:
A :class:`DepthEncoderConfig` for depth features, otherwise a
:class:`VideoEncoderConfig`.
"""
video_info = video_info or {}
is_depth = bool(video_info.get("is_depth_map") or video_info.get("video.is_depth_map"))
cls: type[VideoEncoderConfig] = DepthEncoderConfig if is_depth else VideoEncoderConfig
return cls.from_video_info(video_info)
+9 -3
View File
@@ -529,8 +529,12 @@ def compute_episode_stats(
)
if features[key]["dtype"] in ["image", "video"]:
normalization_factor = (
255.0 if not (features[key].get("info") or {}).get("is_depth_map", False) else 1.0
)
ep_stats[key] = {
k: v if k == "count" else np.squeeze(v / 255.0, axis=0) for k, v in ep_stats[key].items()
k: v if k == "count" else np.squeeze(v / normalization_factor, axis=0)
for k, v in ep_stats[key].items()
}
return ep_stats
@@ -550,8 +554,10 @@ def _validate_stat_value(value: np.ndarray, key: str, feature_key: str) -> None:
if key == "count" and value.shape != (1,):
raise ValueError(f"Shape of 'count' must be (1), but is {value.shape} instead.")
if "image" in feature_key and key != "count" and value.shape != (3, 1, 1):
raise ValueError(f"Shape of quantile '{key}' must be (3,1,1), but is {value.shape} instead.")
if "image" in feature_key and key != "count" and value.shape not in ((3, 1, 1), (1, 1, 1)):
raise ValueError(
f"Shape of quantile '{key}' must be (3,1,1) or (1,1,1) but is {value.shape} instead."
)
def _assert_type_and_shape(stats_list: list[dict[str, dict]]):
+34 -6
View File
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
from collections.abc import Callable
from collections.abc import Callable, Iterable
from pathlib import Path
import numpy as np
@@ -337,6 +337,25 @@ class LeRobotDatasetMetadata:
"""Keys to access visual modalities stored as videos."""
return [key for key, ft in self.features.items() if ft["dtype"] == "video"]
@property
def depth_keys(self) -> list[str]:
"""Keys to access depth-map modalities stored as videos or images.
A depth key is a feature whose ``info`` dict carries ``"is_depth_map": True``
(or the legacy ``"video.is_depth_map"`` inside ``info`` or ``video_info``).
"""
def _is_depth(ft: dict) -> bool:
info = ft.get("info") or {}
video_info = ft.get("video_info") or {}
return (
info.get("is_depth_map", False)
or info.get("video.is_depth_map", False)
or video_info.get("video.is_depth_map", False)
)
return [key for key, ft in self.features.items() if _is_depth(ft)]
@property
def camera_keys(self) -> list[str]:
"""Keys to access visual modalities (regardless of their storage method)."""
@@ -580,7 +599,8 @@ class LeRobotDatasetMetadata:
def update_video_info(
self,
video_key: str | None = None,
camera_encoder: VideoEncoderConfig | None = None,
video_encoder: VideoEncoderConfig | None = None,
preserve_keys: Iterable[str] | None = None,
) -> None:
"""Populate per-feature video info in ``info.json``.
@@ -590,19 +610,27 @@ class LeRobotDatasetMetadata:
Args:
video_key: If provided, only update this video key. Otherwise update
all video keys in the dataset.
camera_encoder: Encoder configuration used to produce the
video_encoder: Encoder configuration used to produce the
videos. When provided, its fields are recorded as
``video.<field>`` entries alongside the stream-derived
``video.*`` entries (see :func:`get_video_info`).
preserve_keys: Optional iterable of ``info`` keys whose existing
values must be kept as-is.
"""
if video_key is not None and video_key not in self.video_keys:
raise ValueError(f"Video key {video_key} not found in dataset")
video_keys = [video_key] if video_key is not None else self.video_keys
preserve_set = set(preserve_keys or ())
for key in video_keys:
if not self.features[key].get("info", None):
video_path = self.root / self.video_path.format(video_key=key, chunk_index=0, file_index=0)
self.info.features[key]["info"] = get_video_info(video_path, camera_encoder=camera_encoder)
existing = self.features[key].get("info") or {}
# Skip only if real video info has already been written. The ``is_depth_map`` entry (created at feature creation) is not blocking.
if set(existing.keys()) - {"is_depth_map"}:
continue
video_path = self.root / self.video_path.format(video_key=key, chunk_index=0, file_index=0)
new_info = get_video_info(video_path, video_encoder=video_encoder)
new_info = {k: v for k, v in new_info.items() if k not in preserve_set}
self.info.features[key]["info"] = {**existing, **new_info}
def update_chunk_settings(
self,
+26
View File
@@ -22,7 +22,10 @@ from pathlib import Path
import datasets
import torch
from lerobot.configs.video import DepthEncoderConfig
from .dataset_metadata import LeRobotDatasetMetadata
from .depth_utils import dequantize_depth
from .feature_utils import (
check_delta_timestamps,
get_delta_indices,
@@ -51,6 +54,7 @@ class DatasetReader:
delta_timestamps: dict[str, list[float]] | None,
image_transforms: Callable | None,
return_uint8: bool = False,
depth_output_unit: str = "mm",
):
"""Initialize the reader with metadata, filtering, and transform config.
@@ -68,6 +72,10 @@ class DatasetReader:
relative timestamp offsets for temporal context windows.
image_transforms: Optional torchvision v2 transform applied to
visual features.
return_uint8: If True, return RGB video frames as raw uint8 tensors
instead of normalized float32.
depth_output_unit: Physical unit depth maps are dequantized to
(``"m"`` or ``"mm"``). Defaults to ``"mm"``.
"""
self._meta = meta
self.root = root
@@ -76,6 +84,7 @@ class DatasetReader:
self._video_backend = video_backend
self._image_transforms = image_transforms
self._return_uint8 = return_uint8
self._depth_output_unit = depth_output_unit
self.hf_dataset: datasets.Dataset | None = None
self._absolute_to_relative_idx: dict[int, int] | None = None
@@ -86,6 +95,12 @@ class DatasetReader:
check_delta_timestamps(delta_timestamps, meta.fps, tolerance_s)
self.delta_indices = get_delta_indices(delta_timestamps, meta.fps)
##TODO(CarolinePascal): Should we rather use a more lightweight structure ?
self._depth_encoder_configs: dict[str, DepthEncoderConfig] = {
vid_key: DepthEncoderConfig.from_video_info(self._meta.features[vid_key].get("info"))
for vid_key in self._meta.depth_keys
}
def try_load(self) -> bool:
"""Attempt to load from local cache. Returns True if data is sufficient."""
try:
@@ -247,7 +262,18 @@ class DatasetReader:
self._tolerance_s,
self._video_backend,
return_uint8=self._return_uint8,
is_depth=vid_key in self._meta.depth_keys,
)
if vid_key in self._meta.depth_keys:
depth_encoder = self._depth_encoder_configs[vid_key]
frames = dequantize_depth(
frames,
depth_min=depth_encoder.depth_min,
depth_max=depth_encoder.depth_max,
shift=depth_encoder.shift,
use_log=depth_encoder.use_log,
output_unit=self._depth_output_unit,
)
return vid_key, frames.squeeze(0)
items = list(query_timestamps.items())
+62 -31
View File
@@ -36,7 +36,14 @@ import pyarrow.parquet as pq
import torch
from tqdm import tqdm
from lerobot.configs import VideoEncoderConfig, camera_encoder_defaults
from lerobot.configs import (
DepthEncoderConfig,
VideoEncoderConfig,
camera_encoder_defaults,
depth_encoder_defaults,
encoder_config_from_video_info,
)
from lerobot.configs.video import DEPTH_ENCODER_INFO_FIELD_NAMES
from lerobot.utils.constants import ACTION, HF_LEROBOT_HOME, OBS_IMAGE, OBS_STATE
from lerobot.utils.utils import flatten_dict
@@ -726,7 +733,7 @@ def _copy_and_reindex_videos(
for video_key in src_dataset.meta.video_keys:
logging.info(f"Processing videos for {video_key}")
camera_encoder = VideoEncoderConfig.from_video_info(
video_encoder = encoder_config_from_video_info(
src_dataset.meta.info.features.get(video_key, {}).get("info")
)
@@ -810,7 +817,7 @@ def _copy_and_reindex_videos(
dst_video_path,
episodes_to_keep_ranges,
src_dataset.meta.fps,
camera_encoder,
video_encoder,
)
cumulative_ts = 0.0
@@ -1190,7 +1197,10 @@ def _save_batch_episodes_images(
i, item = i_item_tuple
img = item[img_key_param]
# Use global frame index for naming
img.save(str(imgs_dir / f"frame-{base_frame_idx + i:06d}.png"), quality=100)
if img_key_param in dataset.meta.depth_keys:
img.save(str(imgs_dir / f"frame-{base_frame_idx + i:06d}.tiff"), compression="raw")
else:
img.save(str(imgs_dir / f"frame-{base_frame_idx + i:06d}.png"), quality=100)
return i
episode_durations = []
@@ -1281,7 +1291,7 @@ def _estimate_frame_size_via_calibration(
episode_indices: list[int],
temp_dir: Path,
fps: int,
camera_encoder: VideoEncoderConfig,
video_encoder: VideoEncoderConfig,
num_calibration_frames: int = 30,
) -> float:
"""Estimate MB per frame by encoding a small calibration sample.
@@ -1295,7 +1305,7 @@ def _estimate_frame_size_via_calibration(
episode_indices: List of episode indices being processed.
temp_dir: Temporary directory for calibration files.
fps: Frames per second for video encoding.
camera_encoder: Video encoder settings used for calibration encoding.
video_encoder: Video encoder settings used for calibration encoding.
num_calibration_frames: Number of frames to use for calibration (default: 30).
Returns:
@@ -1331,7 +1341,7 @@ def _estimate_frame_size_via_calibration(
imgs_dir=calibration_dir,
video_path=calibration_video_path,
fps=fps,
camera_encoder=camera_encoder,
video_encoder=video_encoder,
overwrite=True,
)
@@ -1604,6 +1614,7 @@ def recompute_stats(
raise ValueError(f"No parquet files found in {data_dir}")
all_episode_stats = []
# TODO: enable image and video stats re-computation
numeric_keys = [k for k, v in features_to_compute.items() if v["dtype"] not in ["image", "video"]]
for parquet_path in tqdm(parquet_files, desc="Computing stats from data files"):
@@ -1650,6 +1661,7 @@ def convert_image_to_video_dataset(
output_dir: Path | None = None,
repo_id: str | None = None,
camera_encoder: VideoEncoderConfig | None = None,
depth_encoder: VideoEncoderConfig | None = None,
episode_indices: list[int] | None = None,
num_workers: int = 4,
max_episodes_per_batch: int | None = None,
@@ -1676,6 +1688,8 @@ def convert_image_to_video_dataset(
"""
if camera_encoder is None:
camera_encoder = camera_encoder_defaults()
if depth_encoder is None:
depth_encoder = depth_encoder_defaults()
# Check that it's an image dataset
if len(dataset.meta.video_keys) > 0:
@@ -1700,10 +1714,7 @@ def convert_image_to_video_dataset(
logging.info(
f"Converting {len(episode_indices)} episodes with {len(img_keys)} cameras from {dataset.repo_id}"
)
logging.info(
f"Video codec: {camera_encoder.vcodec}, pixel format: {camera_encoder.pix_fmt}, "
f"GOP: {camera_encoder.g}, CRF: {camera_encoder.crf}"
)
logging.info(f"RGB video encoder: {camera_encoder}, depth video encoder: {depth_encoder}")
# Create new features dict, converting image features to video features
new_features = {}
@@ -1765,6 +1776,8 @@ def convert_image_to_video_dataset(
episode_lengths = {ep_idx: dataset.meta.episodes["length"][ep_idx] for ep_idx in episode_indices}
for img_key in tqdm(img_keys, desc="Processing cameras"):
target_encoder = depth_encoder if img_key in dataset.meta.depth_keys else camera_encoder
# Estimate size per frame by encoding a small calibration sample
# This provides accurate compression ratio for the specific codec parameters
size_per_frame_mb = _estimate_frame_size_via_calibration(
@@ -1773,7 +1786,7 @@ def convert_image_to_video_dataset(
episode_indices=episode_indices,
temp_dir=temp_dir,
fps=fps,
camera_encoder=camera_encoder,
video_encoder=target_encoder,
)
logging.info(f"Processing camera: {img_key}")
@@ -1815,7 +1828,7 @@ def convert_image_to_video_dataset(
imgs_dir=imgs_dir,
video_path=video_path,
fps=fps,
camera_encoder=camera_encoder,
video_encoder=target_encoder,
overwrite=True,
)
@@ -1862,7 +1875,7 @@ def convert_image_to_video_dataset(
video_key=img_key, chunk_index=0, file_index=0
)
new_meta.info.features[img_key]["info"] = get_video_info(
video_path, camera_encoder=camera_encoder
video_path, video_encoder=camera_encoder
)
write_info(new_meta.info, new_meta.root)
@@ -1890,11 +1903,11 @@ def convert_image_to_video_dataset(
def _reencode_video_worker(args: tuple) -> Path:
"""Picklable worker for :func:`reencode_dataset`'s process pool."""
video_path, camera_encoder, encoder_threads = args
video_path, video_encoder, encoder_threads = args
reencode_video(
input_video_path=video_path,
output_video_path=video_path,
camera_encoder=camera_encoder,
video_encoder=video_encoder,
encoder_threads=encoder_threads,
overwrite=True,
)
@@ -1903,7 +1916,8 @@ def _reencode_video_worker(args: tuple) -> Path:
def reencode_dataset(
dataset: LeRobotDataset,
camera_encoder: VideoEncoderConfig,
camera_encoder: VideoEncoderConfig | None = None,
depth_encoder: DepthEncoderConfig | None = None,
encoder_threads: int | None = None,
num_workers: int | None = None,
) -> LeRobotDataset:
@@ -1914,8 +1928,11 @@ def reencode_dataset(
Args:
dataset: An existing :class:`LeRobotDataset` whose videos will be
re-encoded.
camera_encoder: Target encoder configuration applied to every video
file.
camera_encoder: Target encoder configuration applied to every RGB video
file. If ``None``, re-encoding is skipped for RGB videos.
depth_encoder: Target encoder configuration applied to every depth video
file. If ``None``, re-encoding is skipped for depth videos.
Quantization parameters will not override the ones in the current dataset.
encoder_threads: Per-encoder thread count forwarded to
:func:`reencode_video`. ``None`` lets the codec decide.
num_workers: Number of parallel processes. ``None`` or ``0`` means
@@ -1927,23 +1944,35 @@ def reencode_dataset(
on disk.
"""
meta = dataset.meta
video_paths_list = []
video_keys_encoders_dict = {}
video_keys_paths_dict = {}
if camera_encoder is None and depth_encoder is None:
raise ValueError("Either camera_encoder or depth_encoder must be provided")
# Only re-encode if the videos are not already encoded with the given video encoding parameters
for video_key in meta.video_keys:
current_info = meta.info.features[video_key].get("info", {})
current_encoder = VideoEncoderConfig.from_video_info(current_info)
if current_encoder != camera_encoder:
video_paths_list.extend((meta.root / VIDEO_DIR / video_key).rglob("*.mp4"))
current_encoder = encoder_config_from_video_info(current_info)
target_encoder = depth_encoder if video_key in meta.depth_keys else camera_encoder
if target_encoder is None:
logging.info(f"No encoder provided for {video_key} video. Skipping re-encoding.")
elif current_encoder != target_encoder:
video_keys_paths_dict[video_key] = (meta.root / VIDEO_DIR / video_key).rglob("*.mp4")
video_keys_encoders_dict[video_key] = target_encoder
else:
logging.info(f"{video_key} videos are already encoded with {camera_encoder}. Nothing to do.")
logging.info(f"{video_key} videos are already encoded with {target_encoder}. Nothing to do.")
if len(video_paths_list) == 0:
if len(video_keys_paths_dict) == 0:
logging.warning("Dataset has no videos to re-encode.")
return dataset
logging.info(f"Re-encoding {len(video_paths_list)} video file(s) with {camera_encoder}")
logging.info(f"Re-encoding {sum(len(paths) for paths in video_keys_paths_dict.values())} video file(s).")
worker_args = [(vp, camera_encoder, encoder_threads) for vp in video_paths_list]
worker_args = [
(path, encoder, encoder_threads)
for video_key, encoder in video_keys_encoders_dict.items()
for path in video_keys_paths_dict[video_key]
]
if num_workers and num_workers > 1:
with ProcessPoolExecutor(max_workers=num_workers) as pool:
futures = [pool.submit(_reencode_video_worker, args) for args in worker_args]
@@ -1957,10 +1986,12 @@ def reencode_dataset(
for args in tqdm(worker_args, desc="Re-encoding videos"):
_reencode_video_worker(args)
# Refresh video info in metadata for every video key.
for vid_key in meta.video_keys:
video_path = meta.root / meta.get_video_file_path(0, vid_key)
meta.info.features[vid_key]["info"] = get_video_info(video_path, camera_encoder=camera_encoder)
# Refresh video info in metadata for every video key. For depth videos, preserve
# ``is_depth_map`` and the depth quantization parameters.
depth_preserve_keys = {"is_depth_map", *(f"video.{n}" for n in DEPTH_ENCODER_INFO_FIELD_NAMES)}
for video_key, encoder in video_keys_encoders_dict.items():
preserve_keys = depth_preserve_keys if video_key in meta.depth_keys else None
meta.update_video_info(video_key=video_key, video_encoder=encoder, preserve_keys=preserve_keys)
write_info(meta.info, meta.root)
logging.info("Dataset metadata updated.")
+38 -10
View File
@@ -31,7 +31,12 @@ import PIL.Image
import pyarrow.parquet as pq
import torch
from lerobot.configs import VideoEncoderConfig, camera_encoder_defaults
from lerobot.configs import (
DepthEncoderConfig,
VideoEncoderConfig,
camera_encoder_defaults,
depth_encoder_defaults,
)
from .compute_stats import compute_episode_stats
from .dataset_metadata import LeRobotDatasetMetadata
@@ -48,6 +53,7 @@ from .io_utils import (
write_info,
)
from .utils import (
DEFAULT_DEPTH_PATH,
DEFAULT_EPISODES_PATH,
DEFAULT_IMAGE_PATH,
update_chunk_file_indices,
@@ -67,17 +73,22 @@ def _encode_video_worker(
episode_index: int,
root: Path,
fps: int,
camera_encoder: VideoEncoderConfig | None = None,
video_encoder: VideoEncoderConfig | None = None,
encoder_threads: int | None = None,
) -> Path:
temp_path = Path(tempfile.mkdtemp(dir=root)) / f"{video_key}_{episode_index:03d}.mp4"
fpath = DEFAULT_IMAGE_PATH.format(image_key=video_key, episode_index=episode_index, frame_index=0)
path_template = (
DEFAULT_DEPTH_PATH
if video_encoder is not None and isinstance(video_encoder, DepthEncoderConfig)
else DEFAULT_IMAGE_PATH
)
fpath = path_template.format(image_key=video_key, episode_index=episode_index, frame_index=0)
img_dir = (root / fpath).parent
encode_video_frames(
img_dir,
temp_path,
fps,
camera_encoder=camera_encoder,
video_encoder=video_encoder,
encoder_threads=encoder_threads,
overwrite=True,
)
@@ -97,6 +108,7 @@ class DatasetWriter:
meta: LeRobotDatasetMetadata,
root: Path,
camera_encoder: VideoEncoderConfig | None,
depth_encoder: DepthEncoderConfig | None,
encoder_threads: int | None,
batch_encoding_size: int,
streaming_encoder: StreamingVideoEncoder | None = None,
@@ -110,6 +122,8 @@ class DatasetWriter:
root: Local dataset root directory.
camera_encoder: Video encoder settings applied to all cameras.
``None`` uses :func:`~lerobot.configs.camera_encoder_defaults`.
depth_encoder: Video encoder settings applied to all **depth** cameras.
``None`` uses :func:`~lerobot.configs.depth_encoder_defaults`.
encoder_threads: Number of encoder threads (global). ``None``
lets the codec decide.
batch_encoding_size: Number of episodes to accumulate before
@@ -121,6 +135,7 @@ class DatasetWriter:
self._meta = meta
self._root = root
self._camera_encoder = camera_encoder or camera_encoder_defaults()
self._depth_encoder = depth_encoder or depth_encoder_defaults()
self._encoder_threads = encoder_threads
self._batch_encoding_size = batch_encoding_size
self._streaming_encoder = streaming_encoder
@@ -145,7 +160,8 @@ class DatasetWriter:
return ep_buffer
def _get_image_file_path(self, episode_index: int, image_key: str, frame_index: int) -> Path:
fpath = DEFAULT_IMAGE_PATH.format(
path_template = DEFAULT_DEPTH_PATH if image_key in self._meta.depth_keys else DEFAULT_IMAGE_PATH
fpath = path_template.format(
image_key=image_key, episode_index=episode_index, frame_index=frame_index
)
return self._root / fpath
@@ -195,6 +211,7 @@ class DatasetWriter:
if frame_index == 0 and self._streaming_encoder is not None:
self._streaming_encoder.start_episode(
video_keys=list(self._meta.video_keys),
depth_video_keys=list(self._meta.depth_keys),
temp_dir=self._root,
)
@@ -282,10 +299,13 @@ class DatasetWriter:
if use_streaming:
streaming_results = self._streaming_encoder.finish_episode()
for video_key in self._meta.video_keys:
normalization_factor = 255.0 if video_key not in self._meta.depth_keys else 1.0
temp_path, video_stats = streaming_results[video_key]
if video_stats is not None:
ep_stats[video_key] = {
k: v if k == "count" else np.squeeze(v.reshape(1, -1, 1, 1) / 255.0, axis=0)
k: v
if k == "count"
else np.squeeze(v.reshape(1, -1, 1, 1) / normalization_factor, axis=0)
for k, v in video_stats.items()
}
ep_metadata.update(self._save_episode_video(video_key, episode_index, temp_path=temp_path))
@@ -300,7 +320,9 @@ class DatasetWriter:
episode_index,
self._root,
self._meta.fps,
self._camera_encoder,
self._depth_encoder
if video_key in self._meta.depth_keys
else self._camera_encoder,
self._encoder_threads,
): video_key
for video_key in self._meta.video_keys
@@ -511,7 +533,12 @@ class DatasetWriter:
# Update video info (only needed when first episode is encoded)
if episode_index == 0:
self._meta.update_video_info(video_key, camera_encoder=self._camera_encoder)
self._meta.update_video_info(
video_key,
video_encoder=self._depth_encoder
if video_key in self._meta.depth_keys
else self._camera_encoder,
)
write_info(self._meta.info, self._meta.root)
metadata = {
@@ -578,13 +605,14 @@ class DatasetWriter:
self.image_writer.wait_until_done()
def _encode_temporary_episode_video(self, video_key: str, episode_index: int) -> Path:
"""Use ffmpeg to convert frames stored as png into mp4 videos."""
"""Use ffmpeg to convert frames stored as png/tiff into mp4 videos."""
is_depth = video_key in self._meta.depth_keys
return _encode_video_worker(
video_key,
episode_index,
self._root,
self._meta.fps,
self._camera_encoder,
self._depth_encoder if is_depth else self._camera_encoder,
self._encoder_threads,
)
+256
View File
@@ -0,0 +1,256 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Depth encoding/decoding helpers for :class:`VideoEncoderConfig`.
"""
import math
from typing import Literal
import av
import numpy as np
import torch
from numpy.typing import NDArray
from lerobot.configs.video import (
DEFAULT_DEPTH_MAX,
DEFAULT_DEPTH_MIN,
DEFAULT_DEPTH_PIX_FMT,
DEFAULT_DEPTH_SHIFT,
DEFAULT_DEPTH_USE_LOG,
DEPTH_QMAX,
)
from .pyav_utils import write_u16_plane
_MM_PER_METRE = 1000.0
_UINT16_MAX = 65535
def _validate_log_quant_params(depth_min: float, shift: float) -> None:
"""Ensure ``log(depth_min + shift)`` is finite."""
if depth_min + shift <= 0:
raise ValueError(
f"depth_min + shift must be positive for logarithmic quantization, "
f"got depth_min={depth_min} + shift={shift} = {depth_min + shift}"
)
def _depth_input_to_float32_and_unit(
depth: NDArray[np.integer] | NDArray[np.floating],
input_unit: Literal["auto", "m", "mm"],
) -> tuple[NDArray[np.float32], Literal["m", "mm"]]:
"""Convert depth to float32 in the chosen unit, and return the resolved unit."""
resolved_unit = (
("m" if np.issubdtype(depth.dtype, np.floating) else "mm") if input_unit == "auto" else input_unit
)
return depth.astype(np.float32, order="K"), resolved_unit
def quantize_depth(
depth: NDArray[np.uint16] | NDArray[np.float32] | torch.Tensor,
depth_min: float = DEFAULT_DEPTH_MIN,
depth_max: float = DEFAULT_DEPTH_MAX,
shift: float = DEFAULT_DEPTH_SHIFT,
use_log: bool = DEFAULT_DEPTH_USE_LOG,
pix_fmt: str = DEFAULT_DEPTH_PIX_FMT,
video_backend: str | None = "pyav",
input_unit: Literal["auto", "m", "mm"] = "auto",
) -> NDArray[np.uint16] | av.VideoFrame:
"""Quantize depth to 12-bit codes (``uint16``, values ``0…DEPTH_QMAX``).
Depth maps are packed into 12-bit integer frames so they fit in standard
high-bit-depth pixel formats (e.g. ``yuv420p12le`` / ``gray12le``)
and can be encoded by widely supported video codecs (HEVC Main 12, ffv1).
Logarithmic quantization is the default because it allocates more quanta
to near-range depth, which matches the (1/depth) error profile of typical
depth sensors. Math is ported from BEHAVIOR-1K's ``obs_utils.py``.
**Input units**:
- ``input_unit="auto"`` (default): infer from dtype (floating = m, non-floating = mm).
- ``input_unit="mm"``: interpret input values as millimetres.
- ``input_unit="m"``: interpret input values as metres.
Quantization math runs in the **resolved input unit**.
``depth_min``, ``depth_max``, and ``shift`` are always in **metres**.
Args:
depth: Depth map; ``torch.Tensor`` is moved to CPU for conversion.
depth_min: Depth (metres) at quantum ``0``.
depth_max: Depth (metres) at quantum :data:`DEPTH_QMAX`.
shift: Depth shift (metres); used in log mode. Must satisfy ``depth_min + shift > 0``.
use_log: If ``True`` (default), quantize in log space.
video_backend: Video backend to use for encoding. Defaults to "pyav".
input_unit: Input unit policy (``"auto"``, ``"mm"``, ``"m"``).
Returns:
``numpy.ndarray``, ``dtype=uint16``, same shape as ``depth``, values in
``[0, DEPTH_QMAX]``.
Raises:
ValueError: If ``input_unit`` is not ``"auto"``, ``"mm"``, or ``"m"``.
ValueError: If ``use_log=True`` and ``depth_min + shift <= 0``.
"""
if input_unit not in ("auto", "m", "mm"):
raise ValueError(f"input_unit must be 'auto', 'm', or 'mm', got {input_unit!r}")
if isinstance(depth, torch.Tensor):
depth = depth.detach().cpu().numpy()
# Squeeze single-channel dim: (H, W, 1) or (1, H, W) → (H, W)
if depth.ndim == 3 and (depth.shape[-1] == 1 or depth.shape[0] == 1):
depth = depth.squeeze()
depth_f, resolved_unit = _depth_input_to_float32_and_unit(depth, input_unit=input_unit)
# Convert depth_min, depth_max, and shift to the resolved input unit.
depth_min_u = np.float32(depth_min) if resolved_unit == "m" else np.float32(depth_min * _MM_PER_METRE)
depth_max_u = np.float32(depth_max) if resolved_unit == "m" else np.float32(depth_max * _MM_PER_METRE)
shift_u = np.float32(shift) if resolved_unit == "m" else np.float32(shift * _MM_PER_METRE)
# Normalization and quantization is performed in the resolved input unit.
if use_log:
_validate_log_quant_params(depth_min, shift)
log_min = math.log(float(depth_min_u + shift_u))
log_max = math.log(float(depth_max_u + shift_u))
norm = (np.log(depth_f + shift_u) - log_min) / (log_max - log_min)
else:
norm = (depth_f - depth_min_u) / (depth_max_u - depth_min_u)
quantized = np.rint(norm * DEPTH_QMAX).clip(0, DEPTH_QMAX).astype(np.uint16, copy=False)
if video_backend == "pyav":
frame = av.VideoFrame.from_ndarray(quantized, format=pix_fmt)
write_u16_plane(frame.planes[0], quantized)
return frame
else:
return quantized
def dequantize_depth(
quantized: NDArray[np.uint16] | av.VideoFrame | torch.Tensor,
depth_min: float = DEFAULT_DEPTH_MIN,
depth_max: float = DEFAULT_DEPTH_MAX,
shift: float = DEFAULT_DEPTH_SHIFT,
use_log: bool = DEFAULT_DEPTH_USE_LOG,
pix_fmt: str = DEFAULT_DEPTH_PIX_FMT,
output_unit: Literal["m", "mm"] = "mm",
output_tensor: bool = True,
output_channel_last: bool = False,
) -> NDArray[np.uint16] | NDArray[np.float32] | torch.Tensor:
"""Inverse of :func:`quantize_depth`.
Decoding inverts the same normalized code mapping as :func:`quantize_depth`
using ``depth_min`` / ``depth_max`` / ``shift`` (in metres), then returns
the requested output unit. Tuning arguments **must match** :func:`quantize_depth`.
Accepted input layouts :
- ``(H, W, 1)`` or ``(H, W)`` single frame with channel-last.
- ``(..., 1, H, W)`` batched frames with channel-first.
- ``(..., H, W, 1)`` batched frames with channel-last.
Output layout is determined by ``output_channel_last``.
Args:
quantized: 12-bit codes in ``[0, DEPTH_QMAX]``. ``np.ndarray``,
``av.VideoFrame``, or ``torch.Tensor`` (any integer or float dtype).
depth_min, depth_max, shift, use_log: Same as :func:`quantize_depth` (metres).
pix_fmt: Pixel format used to extract the plane from an ``av.VideoFrame``.
output_unit: ``"mm"`` returns ``uint16`` millimetres (rint, clip
``[0, 65535]``) when returning a numpy array, or ``float32`` mm when
``output_tensor=True``. ``"m"`` returns ``float32`` metres in
``[depth_min, depth_max]``.
output_tensor: If True, return a ``torch.Tensor`` instead of a numpy array.
Returns:
Depth map in the requested unit and dtype.
Raises:
ValueError: If ``output_unit`` is not ``"m"`` or ``"mm"``.
ValueError: If ``use_log=True`` and ``depth_min + shift <= 0``.
"""
if output_unit not in ("m", "mm"):
raise ValueError(f"output_unit must be 'm' or 'mm', got {output_unit!r}")
if use_log:
_validate_log_quant_params(depth_min, shift)
if isinstance(quantized, av.VideoFrame):
quantized = quantized.to_ndarray(format=pix_fmt)
# Compute the scale and offset first.
depth_min_m = float(depth_min)
depth_max_m = float(depth_max)
shift_m = float(shift)
if use_log:
log_min = math.log(depth_min_m + shift_m)
log_max = math.log(depth_max_m + shift_m)
scale = (log_max - log_min) / DEPTH_QMAX
offset = log_min
else:
scale = (depth_max_m - depth_min_m) / DEPTH_QMAX
offset = depth_min_m
# ── Torch path: stay on the input device, single fp32 allocation. ────────
if isinstance(quantized, torch.Tensor):
if quantized.ndim >= 3:
# Drop the single-channel dimension so the math runs on (..., H, W).
quantized = quantized.squeeze(-3) if quantized.shape[-3] == 1 else quantized.squeeze(-1)
# Single allocation we own; everything else is in-place.
buf = quantized.to(dtype=torch.float32, copy=True)
buf.mul_(scale).add_(offset)
if use_log:
buf.exp_().sub_(shift_m)
buf.clamp_(depth_min_m, depth_max_m)
buf.unsqueeze_(-1) if output_channel_last else buf.unsqueeze_(-3)
if output_unit == "m":
return buf if output_tensor else buf.cpu().numpy()
# mm path: round + clamp in float32, skipping the uint16 round-trip
# when returning a tensor (torch.uint16 is poorly supported).
buf.mul_(_MM_PER_METRE).round_().clamp_(0.0, _UINT16_MAX)
if output_tensor:
return buf
return buf.cpu().numpy().astype(np.uint16, copy=False)
# ── NumPy path: single fp32 allocation, ``out=`` for in-place math. ─────
arr = np.asarray(quantized)
if arr.ndim >= 3:
# Drop the single-channel dimension so the math runs on (..., H, W).
arr = np.squeeze(arr, axis=-3) if arr.shape[-3] == 1 else np.squeeze(arr, axis=-1)
buf = np.empty(arr.shape, dtype=np.float32)
np.multiply(arr, scale, out=buf)
np.add(buf, offset, out=buf)
if use_log:
np.exp(buf, out=buf)
np.subtract(buf, shift_m, out=buf)
np.clip(buf, depth_min_m, depth_max_m, out=buf)
buf = np.expand_dims(buf, axis=-1) if output_channel_last else np.expand_dims(buf, axis=-3)
if output_unit == "m":
return torch.from_numpy(buf) if output_tensor else buf
np.multiply(buf, _MM_PER_METRE, out=buf)
np.rint(buf, out=buf)
np.clip(buf, 0.0, _UINT16_MAX, out=buf)
if output_tensor:
# torch.uint16 support is very limited; return float32 millimetres.
return torch.from_numpy(buf)
return buf.astype(np.uint16, copy=False)
+1
View File
@@ -96,6 +96,7 @@ def make_dataset(cfg: TrainPipelineConfig) -> LeRobotDataset | MultiLeRobotDatas
revision=cfg.dataset.revision,
video_backend=cfg.dataset.video_backend,
return_uint8=True,
depth_output_unit=cfg.dataset.depth_output_unit,
tolerance_s=cfg.tolerance_s,
)
else:
+1 -1
View File
@@ -336,7 +336,7 @@ def validate_feature_image_or_video(
Args:
name (str): The name of the feature.
expected_shape (list[str]): The expected shape (C, H, W).
expected_shape (list[str]): The expected shape, e.g. (C, H, W) or (H, W, C).
value: The image data to validate.
Returns:
+51 -5
View File
@@ -42,10 +42,41 @@ def safe_stop_image_writer(func):
def image_array_to_pil_image(image_array: np.ndarray, range_check: bool = True) -> PIL.Image.Image:
# TODO(aliberts): handle 1 channel and 4 for depth images
if image_array.ndim != 3:
raise ValueError(f"The array has {image_array.ndim} dimensions, but 3 is expected for an image.")
"""Convert a NumPy array to a PIL Image, preserving precision for grayscale.
Behaviour by shape:
- ``(H, W)`` or ``(1, H, W)`` / ``(H, W, 1)``: single-channel grayscale.
The native dtype is preserved using the matching PIL mode
(``I;16`` / ``F``). This is the path used for raw depth maps (no rescaling, clamping, or downcasting)
- ``(3, H, W)`` / ``(H, W, 3)``: RGB. Channels-first inputs are transposed
to channels-last. Float inputs in ``[0, 1]`` are scaled to ``uint8``
(existing behaviour, gated by ``range_check``).
Other shapes / channel counts raise ``NotImplementedError`` or
``ValueError``.
"""
# TODO(CarolinePascal): 4 dimensions RGB-D images
if image_array.ndim not in (2, 3):
raise ValueError(f"The array has {image_array.ndim} dimensions, but 2 or 3 is expected for an image.")
# Squeeze 3D single-channel inputs to 2D so depth maps work whether the
# caller emits (H, W), (1, H, W), or (H, W, 1).
if image_array.ndim == 3:
if image_array.shape[0] == 1:
image_array = image_array[0]
elif image_array.shape[-1] == 1:
image_array = image_array[..., 0]
if image_array.ndim == 2:
if image_array.dtype not in [np.uint16, np.float32]:
raise ValueError(
f"Unsupported single-channel image dtype: {image_array.dtype}. "
f"Supported dtypes: {sorted(str(d) for d in [np.uint16, np.float32])}."
)
return PIL.Image.fromarray(np.ascontiguousarray(image_array))
# 3D path: must be RGB (3 channels), channels-first or channels-last.
if image_array.shape[0] == 3:
# Transpose from pytorch convention (C, H, W) to (H, W, C)
image_array = image_array.transpose(1, 2, 0)
@@ -71,13 +102,28 @@ def image_array_to_pil_image(image_array: np.ndarray, range_check: bool = True)
return PIL.Image.fromarray(image_array)
def save_kwargs_for_path(fpath: Path, compress_level: int) -> dict:
"""Pick the right format-specific kwargs for :meth:`PIL.Image.Image.save`.
PNG uses ``compress_level`` (0-9, zlib). TIFF uses ``compression`` (raw) for lossless raw depth maps.
"""
suffix = Path(fpath).suffix.lower()
if suffix == ".png":
return {"compress_level": compress_level}
if suffix in (".tif", ".tiff"):
return {"compression": "raw"}
return {}
def write_image(image: np.ndarray | PIL.Image.Image, fpath: Path, compress_level: int = 1):
"""
Saves a NumPy array or PIL Image to a file.
This function handles both NumPy arrays and PIL Image objects, converting
the former to a PIL Image before saving. It includes error handling for
the save operation.
the save operation. The output format is inferred from the *fpath*
extension: ``.png`` PNG with ``compress_level``, ``.tiff`` / ``.tif``
lossless raw depth maps (TIFF).
Args:
image (np.ndarray | PIL.Image.Image): The image data to save.
@@ -101,7 +147,7 @@ def write_image(image: np.ndarray | PIL.Image.Image, fpath: Path, compress_level
img = image
else:
raise TypeError(f"Unsupported image type: {type(image)}")
img.save(fpath, compress_level=compress_level)
img.save(fpath, **save_kwargs_for_path(fpath, compress_level))
except Exception as e:
logger.error("Error writing image %s: %s", fpath, e)
+25 -3
View File
@@ -24,7 +24,7 @@ import torch.utils
from huggingface_hub import HfApi, snapshot_download
from huggingface_hub.errors import RevisionNotFoundError
from lerobot.configs import VideoEncoderConfig
from lerobot.configs import DepthEncoderConfig, VideoEncoderConfig
from lerobot.utils.constants import HF_LEROBOT_HUB_CACHE
from .dataset_metadata import CODEBASE_VERSION, LeRobotDatasetMetadata
@@ -58,8 +58,10 @@ class LeRobotDataset(torch.utils.data.Dataset):
download_videos: bool = True,
video_backend: str | None = None,
return_uint8: bool = False,
depth_output_unit: str = "mm",
batch_encoding_size: int = 1,
camera_encoder: VideoEncoderConfig | None = None,
depth_encoder: DepthEncoderConfig | None = None,
encoder_threads: int | None = None,
streaming_encoding: bool = False,
encoder_queue_maxsize: int = 30,
@@ -186,6 +188,9 @@ class LeRobotDataset(torch.utils.data.Dataset):
camera_encoder (VideoEncoderConfig | None, optional): Video encoder settings for cameras
(codec, quality, etc.). When ``None``, :func:`~lerobot.configs.video.camera_encoder_defaults`
is used by the writer.
depth_encoder (DepthEncoderConfig | None, optional): Video encoder settings for depth cameras
(codec, quality, etc.). When ``None``, :func:`~lerobot.configs.depth.depth_encoder_defaults`
is used by the writer.
encoder_threads (int | None, optional): Number of encoder threads (global). ``None`` lets the
codec decide.
streaming_encoding (bool, optional): If True, encode video frames in real-time during capture
@@ -208,6 +213,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
self.revision = revision if revision else CODEBASE_VERSION
self._video_backend = video_backend if video_backend else get_safe_default_video_backend()
self._return_uint8 = return_uint8
self._depth_output_unit = depth_output_unit
self._batch_encoding_size = batch_encoding_size
self._encoder_threads = encoder_threads
@@ -248,6 +254,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
delta_timestamps=delta_timestamps,
image_transforms=image_transforms,
return_uint8=self._return_uint8,
depth_output_unit=self._depth_output_unit,
)
# Load actual data
@@ -273,6 +280,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
streaming_enc = self._build_streaming_encoder(
self.meta.fps,
camera_encoder,
depth_encoder,
encoder_queue_maxsize,
encoder_threads,
)
@@ -280,6 +288,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
meta=self.meta,
root=self.root,
camera_encoder=camera_encoder,
depth_encoder=depth_encoder,
encoder_threads=encoder_threads,
batch_encoding_size=batch_encoding_size,
streaming_encoder=streaming_enc,
@@ -315,6 +324,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
delta_timestamps=self.delta_timestamps,
image_transforms=self.image_transforms,
return_uint8=self._return_uint8,
depth_output_unit=self._depth_output_unit,
)
return self.reader
@@ -322,12 +332,14 @@ class LeRobotDataset(torch.utils.data.Dataset):
def _build_streaming_encoder(
fps: int,
camera_encoder: VideoEncoderConfig | None,
depth_encoder: DepthEncoderConfig | None,
encoder_queue_maxsize: int,
encoder_threads: int | None,
) -> StreamingVideoEncoder:
return StreamingVideoEncoder(
fps=fps,
camera_encoder=camera_encoder,
depth_encoder=depth_encoder,
queue_maxsize=encoder_queue_maxsize,
encoder_threads=encoder_threads,
)
@@ -645,6 +657,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
video_backend: str | None = None,
batch_encoding_size: int = 1,
camera_encoder: VideoEncoderConfig | None = None,
depth_encoder: DepthEncoderConfig | None = None,
metadata_buffer_size: int = 10,
streaming_encoding: bool = False,
encoder_queue_maxsize: int = 30,
@@ -677,6 +690,8 @@ class LeRobotDataset(torch.utils.data.Dataset):
batch-encoding videos. ``1`` means encode immediately.
camera_encoder: Video encoder settings for cameras (codec, quality, etc.).
When ``None``, :func:`~lerobot.configs.video.camera_encoder_defaults` is used.
depth_encoder: Video encoder settings for depth cameras (codec, quality, etc.).
When ``None``, :func:`~lerobot.configs.depth.depth_encoder_defaults` is used.
encoder_threads: Number of encoder threads (global). ``None``
lets the codec decide.
metadata_buffer_size: Number of episode metadata records to buffer
@@ -711,6 +726,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
obj.episodes = None
obj._video_backend = video_backend if video_backend is not None else get_safe_default_video_backend()
obj._return_uint8 = False
obj._depth_output_unit = "mm"
obj._batch_encoding_size = batch_encoding_size
obj._encoder_threads = encoder_threads
@@ -720,12 +736,13 @@ class LeRobotDataset(torch.utils.data.Dataset):
streaming_enc = None
if streaming_encoding and len(obj.meta.video_keys) > 0:
streaming_enc = cls._build_streaming_encoder(
fps, camera_encoder, encoder_queue_maxsize, encoder_threads
fps, camera_encoder, depth_encoder, encoder_queue_maxsize, encoder_threads
)
obj.writer = DatasetWriter(
meta=obj.meta,
root=obj.root,
camera_encoder=camera_encoder,
depth_encoder=depth_encoder,
encoder_threads=encoder_threads,
batch_encoding_size=batch_encoding_size,
streaming_encoder=streaming_enc,
@@ -749,6 +766,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
video_backend: str | None = None,
batch_encoding_size: int = 1,
camera_encoder: VideoEncoderConfig | None = None,
depth_encoder: DepthEncoderConfig | None = None,
encoder_threads: int | None = None,
image_writer_processes: int = 0,
image_writer_threads: int = 0,
@@ -778,6 +796,8 @@ class LeRobotDataset(torch.utils.data.Dataset):
batch-encoding videos.
camera_encoder: Video encoder settings for cameras (codec, quality, etc.).
When ``None``, :func:`~lerobot.configs.video.camera_encoder_defaults` is used.
depth_encoder: Video encoder settings for depth cameras (codec, quality, etc.).
When ``None``, :func:`~lerobot.configs.depth.depth_encoder_defaults` is used.
encoder_threads: Number of encoder threads (global). ``None``
lets the codec decide.
image_writer_processes: Subprocesses for async image writing.
@@ -805,6 +825,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
obj.episodes = None
obj._video_backend = video_backend if video_backend else get_safe_default_video_backend()
obj._return_uint8 = False
obj._depth_output_unit = "mm"
obj._batch_encoding_size = batch_encoding_size
if obj._requested_root is not None:
@@ -824,12 +845,13 @@ class LeRobotDataset(torch.utils.data.Dataset):
streaming_enc = None
if streaming_encoding and len(obj.meta.video_keys) > 0:
streaming_enc = cls._build_streaming_encoder(
obj.meta.fps, camera_encoder, encoder_queue_maxsize, encoder_threads
obj.meta.fps, camera_encoder, depth_encoder, encoder_queue_maxsize, encoder_threads
)
obj.writer = DatasetWriter(
meta=obj.meta,
root=obj.root,
camera_encoder=camera_encoder,
depth_encoder=depth_encoder,
encoder_threads=encoder_threads,
batch_encoding_size=batch_encoding_size,
streaming_encoder=streaming_enc,
+37 -2
View File
@@ -24,6 +24,7 @@ import logging
from typing import Any
import av
import numpy as np
logger = logging.getLogger(__name__)
@@ -31,6 +32,22 @@ FFMPEG_NUMERIC_OPTION_TYPES = ("INT", "INT64", "UINT64", "FLOAT", "DOUBLE")
FFMPEG_INTEGER_OPTION_TYPES = ("INT", "INT64", "UINT64")
def write_u16_plane(plane: av.video.plane.VideoPlane, src: np.ndarray, fill_value: int | None = None) -> None:
"""Copy ``src`` into a uint16 plane respecting FFmpeg line padding."""
height, width = src.shape
stride_u16 = plane.line_size // np.dtype(np.uint16).itemsize
dst = np.frombuffer(plane, dtype=np.uint16).reshape(height, stride_u16)
if fill_value is not None:
dst.fill(fill_value)
dst[:, :width] = src
@functools.cache
def get_pix_fmt_channels(pix_fmt: str) -> int:
"""Return the number of components (channels) for *pix_fmt*."""
return len(av.VideoFormat(pix_fmt).components)
@functools.cache
def get_codec(vcodec: str) -> av.codec.Codec | None:
"""PyAV write-mode ``Codec`` for *vcodec*, or ``None`` if unavailable."""
@@ -92,7 +109,7 @@ def _check_option_value(vcodec: str, label: str, value: Any, opt: av.option.Opti
f"{label}={value!r} is not numeric; codec {vcodec!r} expects a number for this option."
) from e
elif isinstance(value, (float, int)):
num_val = value
num_val = float(value)
else:
raise ValueError(
f"{label}={value!r} is not numeric; codec {vcodec!r} expects a number for this option."
@@ -142,6 +159,16 @@ def _check_pixel_format(vcodec: str, pix_fmt: str) -> None:
)
def _check_pix_fmt_channels(pix_fmt: str, channels: int) -> None:
"""Ensure *pix_fmt* can carry at least *channels* components."""
pix_fmt_channels = get_pix_fmt_channels(pix_fmt)
if pix_fmt_channels < channels:
raise ValueError(
f"pix_fmt={pix_fmt!r} carries only {pix_fmt_channels} component(s) "
f"but the source data has {channels} channel(s)."
)
def _check_codec_options(vcodec: str, codec_options: dict[str, Any]) -> None:
"""Validate merged encoder options (typed) against the codec's published AVOptions."""
supported_options = _get_codec_options_by_name(vcodec)
@@ -156,12 +183,18 @@ def _check_codec_options(vcodec: str, codec_options: dict[str, Any]) -> None:
_check_option_value(vcodec, key, value, supported_options[key])
def check_video_encoder_parameters_pyav(vcodec: str, pix_fmt: str, codec_options: dict[str, Any]) -> None:
def check_video_encoder_parameters_pyav(
vcodec: str,
pix_fmt: str,
codec_options: dict[str, Any],
channels: int | None = None,
) -> None:
"""Verify *config* is compatible with the bundled FFmpeg build.
Checks pixel format, abstract tuning-field compatibility, and each merged
encoder option from :meth:`~lerobot.configs.video.VideoEncoderConfig.get_codec_options`
against PyAV (including numeric ``extra_options`` present in that dict).
When given, additionally verify that *pix_fmt* carries as many components as the source data channels.
No-op when ``config.vcodec`` isn't in the local FFmpeg build.
Raises:
@@ -171,4 +204,6 @@ def check_video_encoder_parameters_pyav(vcodec: str, pix_fmt: str, codec_options
if not options:
raise ValueError(f"Codec {vcodec!r} is not available in the bundled FFmpeg build")
_check_pixel_format(vcodec, pix_fmt)
if channels is not None:
_check_pix_fmt_channels(pix_fmt, channels)
_check_codec_options(vcodec, codec_options)
+1
View File
@@ -92,6 +92,7 @@ DEFAULT_EPISODES_PATH = EPISODES_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
DEFAULT_DATA_PATH = DATA_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
DEFAULT_VIDEO_PATH = VIDEO_DIR + "/{video_key}/" + CHUNK_FILE_PATTERN + ".mp4"
DEFAULT_IMAGE_PATH = "images/{image_key}/episode-{episode_index:06d}/frame-{frame_index:06d}.png"
DEFAULT_DEPTH_PATH = "images/{image_key}/episode-{episode_index:06d}/frame-{frame_index:06d}.tiff"
LEGACY_EPISODES_PATH = "meta/episodes.jsonl"
LEGACY_EPISODES_STATS_PATH = "meta/episodes_stats.jsonl"
+110 -64
View File
@@ -39,11 +39,16 @@ from datasets.features.features import register_feature
from PIL import Image
from lerobot.configs import (
DepthEncoderConfig,
VideoEncoderConfig,
camera_encoder_defaults,
depth_encoder_defaults,
)
from lerobot.utils.import_utils import get_safe_default_video_backend
from .depth_utils import quantize_depth
from .pyav_utils import get_pix_fmt_channels
logger = logging.getLogger(__name__)
@@ -53,6 +58,7 @@ def decode_video_frames(
tolerance_s: float,
backend: str | None = None,
return_uint8: bool = False,
is_depth: bool = False,
) -> torch.Tensor:
"""
Decodes video frames using the specified backend.
@@ -64,23 +70,35 @@ def decode_video_frames(
backend (str, optional): Backend to use for decoding. Defaults to "torchcodec" when available
in the platform; otherwise, defaults to "pyav". The legacy value "video_reader" is
accepted for one release as an alias for "pyav" and will be removed in a future version.
return_uint8 (bool): If True, return raw uint8 frames without float32 normalization.
return_uint8 (bool): For RGB videos, if True return raw uint8 frames without float32 normalization.
This reduces memory for DataLoader IPC; normalization can be done on GPU afterward.
is_depth (bool): Set to True if the video is a depth map (1 channel, uint12).
Returns:
torch.Tensor: Decoded frames (float32 in [0,1] by default, or uint8 if return_uint8=True).
torch.Tensor: Decoded frames (RGB: float32 in [0,1] by default, or uint8 if return_uint8=True, Depth: uint12).
Currently supports torchcodec on cpu and pyav.
"""
if backend != "pyav" and is_depth:
logger.warning("Decoding depth maps is only supported with the 'pyav' backend.")
# We do not actually return uint8 here, but we avoid the 255 normalization step.
return decode_video_frames_pyav(
video_path, timestamps, tolerance_s, return_uint8=False, is_depth=True
)
if backend is None:
backend = get_safe_default_video_backend()
if backend == "torchcodec":
return decode_video_frames_torchcodec(video_path, timestamps, tolerance_s, return_uint8=return_uint8)
elif backend == "pyav":
return decode_video_frames_pyav(video_path, timestamps, tolerance_s, return_uint8=return_uint8)
return decode_video_frames_pyav(
video_path, timestamps, tolerance_s, return_uint8=return_uint8, is_depth=is_depth
)
elif backend == "video_reader":
logger.warning("backend='video_reader' is deprecated and now aliases to 'pyav'.")
return decode_video_frames_pyav(video_path, timestamps, tolerance_s, return_uint8=return_uint8)
return decode_video_frames_pyav(
video_path, timestamps, tolerance_s, return_uint8=return_uint8, is_depth=is_depth
)
else:
raise ValueError(f"Unsupported video backend: {backend}")
@@ -91,6 +109,7 @@ def decode_video_frames_pyav(
tolerance_s: float,
log_loaded_timestamps: bool = False,
return_uint8: bool = False,
is_depth: bool = False,
) -> torch.Tensor:
"""Loads frames associated to the requested timestamps of a video using PyAV.
@@ -109,8 +128,9 @@ def decode_video_frames_pyav(
tolerance_s: Allowed deviation in seconds between a queried timestamp and the closest
decoded frame.
log_loaded_timestamps: When True, log every decoded frame's timestamp at INFO level.
return_uint8: When True, return raw uint8 frames (C, H, W). Otherwise, return float32 in
[0, 1] range.
return_uint8: For RGB videos, if True return raw uint8 frames (C, H, W).
Otherwise, return float32 in [0, 1] range.
is_depth: Set to True if the video is a depth map (1 channel, uint12).
Returns:
torch.Tensor of shape (len(timestamps), C, H, W).
@@ -140,9 +160,13 @@ def decode_video_frames_pyav(
current_ts = float(frame.pts * stream.time_base)
if log_loaded_timestamps:
logger.info(f"frame loaded at timestamp={current_ts:.4f}")
# Convert to CHW uint8 to match torchcodec's output layout.
arr = frame.to_ndarray(format="rgb24") # H, W, 3
loaded_frames.append(torch.from_numpy(arr).permute(2, 0, 1).contiguous())
if is_depth:
arr = frame.to_ndarray(format="gray12le") # (H, W) uint12
loaded_frames.append(torch.from_numpy(arr).unsqueeze(0).contiguous())
else:
arr = frame.to_ndarray(format="rgb24") # (H, W, 3)
# Convert to CHW uint8 to match torchcodec's output layout.
loaded_frames.append(torch.from_numpy(arr).permute(2, 0, 1).contiguous())
loaded_ts.append(current_ts)
if current_ts >= last_ts:
break
@@ -185,7 +209,7 @@ def decode_video_frames_pyav(
f"number of queried timestamps ({len(timestamps)})"
)
if return_uint8:
if return_uint8 or is_depth:
return closest_frames
# convert to the pytorch format which is float32 in [0,1] range (and channel first)
@@ -406,17 +430,17 @@ def encode_video_frames(
imgs_dir: Path | str,
video_path: Path | str,
fps: int,
camera_encoder: VideoEncoderConfig | None = None,
video_encoder: VideoEncoderConfig | None = None,
encoder_threads: int | None = None,
*,
log_level: int | None = av.logging.WARNING,
overwrite: bool = False,
) -> None:
"""More info on ffmpeg arguments tuning on `benchmark/video/README.md`"""
if camera_encoder is None:
camera_encoder = camera_encoder_defaults()
vcodec = camera_encoder.vcodec
pix_fmt = camera_encoder.pix_fmt
if video_encoder is None:
video_encoder = camera_encoder_defaults()
vcodec = video_encoder.vcodec
pix_fmt = video_encoder.pix_fmt
video_path = Path(video_path)
imgs_dir = Path(imgs_dir)
@@ -428,7 +452,9 @@ def encode_video_frames(
video_path.parent.mkdir(parents=True, exist_ok=True)
# Get input frames
template = "frame-" + ("[0-9]" * 6) + ".png"
is_depth = isinstance(video_encoder, DepthEncoderConfig)
suffix = ".png" if not is_depth else ".tiff"
template = "frame-" + ("[0-9]" * 6) + suffix
input_list = sorted(
glob.glob(str(imgs_dir / template)), key=lambda x: int(x.split("-")[-1].split(".")[0])
)
@@ -438,7 +464,7 @@ def encode_video_frames(
with Image.open(input_list[0]) as dummy_image:
width, height = dummy_image.size
video_options = camera_encoder.get_codec_options(encoder_threads, as_strings=True)
video_options = video_encoder.get_codec_options(encoder_threads, as_strings=True)
# Set logging level
if log_level is not None:
@@ -455,8 +481,19 @@ def encode_video_frames(
# Loop through input frames and encode them
for input_data in input_list:
with Image.open(input_data) as input_image:
input_image = input_image.convert("RGB")
input_frame = av.VideoFrame.from_image(input_image)
if is_depth:
input_frame = quantize_depth(
np.array(input_image),
depth_min=video_encoder.depth_min,
depth_max=video_encoder.depth_max,
shift=video_encoder.shift,
use_log=video_encoder.use_log,
pix_fmt=video_encoder.pix_fmt,
video_backend="pyav",
)
else:
input_image = input_image.convert("RGB")
input_frame = av.VideoFrame.from_image(input_image)
packet = output_stream.encode(input_frame)
if packet:
output.mux(packet)
@@ -477,7 +514,7 @@ def encode_video_frames(
def reencode_video(
input_video_path: Path | str,
output_video_path: Path | str,
camera_encoder: VideoEncoderConfig | None = None,
video_encoder: VideoEncoderConfig | None = None,
encoder_threads: int | None = None,
log_level: int | None = av.logging.WARNING,
overwrite: bool = False,
@@ -487,13 +524,13 @@ def reencode_video(
Args:
input_video_path: Existing video file to read.
output_video_path: Path for the re-encoded file.
camera_encoder: Encoder configuration. Defaults to :func:`camera_encoder_defaults`.
video_encoder: Encoder configuration. Defaults to :func:`camera_encoder_defaults`.
encoder_threads: Optional thread count forwarded to :meth:`VideoEncoderConfig.get_codec_options`.
log_level: libav log level while encoding, or ``None`` to leave logging unchanged. Defaults to WARNING.
overwrite: When ``False`` and ``output_video_path`` already exists, skip and log a warning.
"""
camera_encoder = camera_encoder or camera_encoder_defaults()
video_encoder = video_encoder or camera_encoder_defaults()
output_video_path = Path(output_video_path)
@@ -503,9 +540,9 @@ def reencode_video(
output_video_path.parent.mkdir(parents=True, exist_ok=True)
video_options = camera_encoder.get_codec_options(encoder_threads, as_strings=True)
vcodec = camera_encoder.vcodec
pix_fmt = camera_encoder.pix_fmt
video_options = video_encoder.get_codec_options(encoder_threads, as_strings=True)
vcodec = video_encoder.vcodec
pix_fmt = video_encoder.pix_fmt
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_named_file:
tmp_output_video_path = tmp_named_file.name
@@ -676,22 +713,21 @@ class _CameraEncoderThread(threading.Thread):
self,
video_path: Path,
fps: int,
vcodec: str,
pix_fmt: str,
codec_options: dict[str, str],
video_encoder: VideoEncoderConfig,
frame_queue: queue.Queue,
result_queue: queue.Queue,
stop_event: threading.Event,
encoder_threads: int | None = None,
):
super().__init__(daemon=True)
self.video_path = video_path
self.fps = fps
self.vcodec = vcodec
self.pix_fmt = pix_fmt
self.codec_options = codec_options
self.video_encoder = video_encoder
self.is_depth = isinstance(video_encoder, DepthEncoderConfig)
self.frame_queue = frame_queue
self.result_queue = result_queue
self.stop_event = stop_event
self.encoder_threads = encoder_threads
def run(self) -> None:
from .compute_stats import RunningQuantileStats, auto_downsample_height_width
@@ -716,12 +752,12 @@ class _CameraEncoderThread(threading.Thread):
# Sentinel: flush and close
break
# Ensure HWC uint8 numpy array
# Ensure HWC (RGB or depth) uint8 (RGB only) numpy array
if isinstance(frame_data, np.ndarray):
if frame_data.ndim == 3 and frame_data.shape[0] == 3:
if frame_data.ndim == 3 and frame_data.shape[0] in (1, 3):
# CHW -> HWC
frame_data = frame_data.transpose(1, 2, 0)
if frame_data.dtype != np.uint8:
if not self.is_depth and frame_data.dtype != np.uint8:
frame_data = (frame_data * 255).astype(np.uint8)
# Open container on first frame (to get width/height)
@@ -729,15 +765,29 @@ class _CameraEncoderThread(threading.Thread):
height, width = frame_data.shape[:2]
Path(self.video_path).parent.mkdir(parents=True, exist_ok=True)
container = av.open(str(self.video_path), "w")
output_stream = container.add_stream(self.vcodec, self.fps, options=self.codec_options)
output_stream.pix_fmt = self.pix_fmt
output_stream = container.add_stream(
self.video_encoder.vcodec,
self.fps,
options=self.video_encoder.get_codec_options(self.encoder_threads, as_strings=True),
)
output_stream.pix_fmt = self.video_encoder.pix_fmt
output_stream.width = width
output_stream.height = height
output_stream.time_base = Fraction(1, self.fps)
# Encode frame with explicit timestamps
pil_img = Image.fromarray(frame_data)
video_frame = av.VideoFrame.from_image(pil_img)
if not self.is_depth:
pil_img = Image.fromarray(frame_data)
video_frame = av.VideoFrame.from_image(pil_img)
else:
video_frame = quantize_depth(
frame_data,
depth_min=self.video_encoder.depth_min,
depth_max=self.video_encoder.depth_max,
shift=self.video_encoder.shift,
use_log=self.video_encoder.use_log,
video_backend=self.video_encoder.video_backend,
)
video_frame.pts = frame_count
video_frame.time_base = Fraction(1, self.fps)
packet = output_stream.encode(video_frame)
@@ -796,6 +846,7 @@ class StreamingVideoEncoder:
self,
fps: int,
camera_encoder: VideoEncoderConfig | None = None,
depth_encoder: DepthEncoderConfig | None = None,
queue_maxsize: int = 30,
encoder_threads: int | None = None,
):
@@ -811,6 +862,7 @@ class StreamingVideoEncoder:
"""
self.fps = fps
self._camera_encoder = camera_encoder or camera_encoder_defaults()
self._depth_encoder = depth_encoder or depth_encoder_defaults()
self._encoder_threads = encoder_threads
self.queue_maxsize = queue_maxsize
@@ -823,18 +875,25 @@ class StreamingVideoEncoder:
self._episode_active = False
self._closed = False
def start_episode(self, video_keys: list[str], temp_dir: Path) -> None:
def start_episode(
self, video_keys: list[str], temp_dir: Path, depth_video_keys: list[str] | None = None
) -> None:
"""Start encoder threads for a new episode.
Args:
video_keys: List of video feature keys (e.g. ["observation.images.laptop"])
temp_dir: Base directory for temporary MP4 files
depth_video_keys: List of video or image feature keys that carry depth maps (e.g.
["observation.images.laptop_depth"]). Defaults to ``[]`` (no depth keys).
"""
if self._episode_active:
self.cancel_episode()
self._dropped_frames.clear()
if depth_video_keys is None:
depth_video_keys = []
for video_key in video_keys:
frame_queue: queue.Queue = queue.Queue(maxsize=self.queue_maxsize)
result_queue: queue.Queue = queue.Queue(maxsize=1)
@@ -843,17 +902,15 @@ class StreamingVideoEncoder:
temp_video_dir = Path(tempfile.mkdtemp(dir=temp_dir))
video_path = temp_video_dir / f"{video_key.replace('/', '_')}_streaming.mp4"
vcodec = self._camera_encoder.vcodec
codec_options = self._camera_encoder.get_codec_options(self._encoder_threads, as_strings=True)
encoder = self._depth_encoder if video_key in depth_video_keys else self._camera_encoder
encoder_thread = _CameraEncoderThread(
video_path=video_path,
fps=self.fps,
vcodec=vcodec,
pix_fmt=self._camera_encoder.pix_fmt,
codec_options=codec_options,
video_encoder=encoder,
frame_queue=frame_queue,
result_queue=result_queue,
stop_event=stop_event,
encoder_threads=self._encoder_threads,
)
encoder_thread.start()
@@ -1060,13 +1117,13 @@ def get_audio_info(video_path: Path | str) -> dict:
def get_video_info(
video_path: Path | str,
camera_encoder: VideoEncoderConfig | None = None,
video_encoder: VideoEncoderConfig | None = None,
) -> dict:
"""Build the ``video.*`` / ``audio.*`` info dict persisted in ``info.json``.
Args:
video_path: Path to the encoded video file to probe.
camera_encoder: If provided, record the exact encoder settings used to encode this
video_encoder: If provided, record the exact encoder settings used to encode this
video. Stream-derived values take precedence encoder fields are only written for keys
not already populated from the video file itself.
"""
@@ -1086,13 +1143,10 @@ def get_video_info(
video_info["video.width"] = video_stream.width
video_info["video.codec"] = video_stream.codec.canonical_name
video_info["video.pix_fmt"] = video_stream.pix_fmt
video_info["video.is_depth_map"] = False
# Calculate fps from r_frame_rate
video_info["video.fps"] = int(video_stream.base_rate)
pixel_channels = get_video_pixel_channels(video_stream.pix_fmt)
video_info["video.channels"] = pixel_channels
video_info["video.channels"] = get_pix_fmt_channels(video_stream.pix_fmt)
# Reset logging level
av.logging.restore_default_callback()
@@ -1101,27 +1155,18 @@ def get_video_info(
video_info.update(**get_audio_info(video_path))
# Add additional encoder configuration if provided
if camera_encoder is not None:
for field_name, field_value in asdict(camera_encoder).items():
if video_encoder is not None:
for field_name, field_value in asdict(video_encoder).items():
# vcodec is already populated from the video stream
if field_name == "vcodec":
continue
video_info.setdefault(f"video.{field_name}", field_value)
video_info["is_depth_map"] = isinstance(video_encoder, DepthEncoderConfig)
return video_info
def get_video_pixel_channels(pix_fmt: str) -> int:
if "gray" in pix_fmt or "depth" in pix_fmt or "monochrome" in pix_fmt:
return 1
elif "rgba" in pix_fmt or "yuva" in pix_fmt:
return 4
elif "rgb" in pix_fmt or "yuv" in pix_fmt:
return 3
else:
raise ValueError("Unknown format")
def get_video_duration_in_s(video_path: Path | str) -> float:
"""
Get the duration of a video file in seconds using PyAV.
@@ -1182,7 +1227,8 @@ class VideoEncodingManager:
img_dir = self.dataset.root / "images"
if img_dir.exists():
png_files = list(img_dir.rglob("*.png"))
if len(png_files) == 0:
tiff_files = list(img_dir.rglob("*.tiff"))
if len(png_files) == 0 and len(tiff_files) == 0:
shutil.rmtree(img_dir)
logger.debug("Cleaned up empty images directory")
else:
+2 -1
View File
@@ -126,7 +126,8 @@ def prepare_observation_for_inference(
for name in observation:
observation[name] = torch.from_numpy(observation[name])
if "image" in name:
observation[name] = observation[name].type(torch.float32) / 255
if observation[name].dtype == torch.uint8:
observation[name] = observation[name].type(torch.float32) / 255
observation[name] = observation[name].permute(2, 0, 1).contiguous()
observation[name] = observation[name].unsqueeze(0)
observation[name] = observation[name].to(device)
+12 -3
View File
@@ -68,9 +68,12 @@ class SOFollower(Robot):
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
features: dict[str, tuple] = {}
for cam in self.cameras:
features[cam] = (self.cameras[cam].height, self.cameras[cam].width, 3)
if getattr(self.cameras[cam], "use_depth", False):
features[f"{cam}_depth"] = (self.cameras[cam].height, self.cameras[cam].width, 1)
return features
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
@@ -190,6 +193,12 @@ class SOFollower(Robot):
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
if getattr(cam, "use_depth", False):
start = time.perf_counter()
obs_dict[f"{cam_key}_depth"] = cam.read_latest_depth()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key} depth: {dt_ms:.1f}ms")
return obs_dict
@check_if_not_connected
+2
View File
@@ -333,6 +333,7 @@ def build_rollout_context(
root=cfg.dataset.root,
batch_encoding_size=cfg.dataset.video_encoding_batch_size,
camera_encoder=cfg.dataset.camera_encoder,
depth_encoder=cfg.dataset.depth_encoder,
streaming_encoding=cfg.dataset.streaming_encoding,
encoder_queue_maxsize=cfg.dataset.encoder_queue_maxsize,
encoder_threads=cfg.dataset.encoder_threads,
@@ -368,6 +369,7 @@ def build_rollout_context(
* len(robot.cameras if hasattr(robot, "cameras") else []),
batch_encoding_size=cfg.dataset.video_encoding_batch_size,
camera_encoder=cfg.dataset.camera_encoder,
depth_encoder=cfg.dataset.depth_encoder,
streaming_encoding=cfg.dataset.streaming_encoding,
encoder_queue_maxsize=cfg.dataset.encoder_queue_maxsize,
encoder_threads=cfg.dataset.encoder_threads,
+2
View File
@@ -403,6 +403,7 @@ def record(
root=cfg.dataset.root,
batch_encoding_size=cfg.dataset.video_encoding_batch_size,
camera_encoder=cfg.dataset.camera_encoder,
depth_encoder=cfg.dataset.depth_encoder,
encoder_threads=cfg.dataset.encoder_threads,
streaming_encoding=cfg.dataset.streaming_encoding,
encoder_queue_maxsize=cfg.dataset.encoder_queue_maxsize,
@@ -432,6 +433,7 @@ def record(
image_writer_threads=cfg.dataset.num_image_writer_threads_per_camera * len(robot.cameras),
batch_encoding_size=cfg.dataset.video_encoding_batch_size,
camera_encoder=cfg.dataset.camera_encoder,
depth_encoder=cfg.dataset.depth_encoder,
encoder_threads=cfg.dataset.encoder_threads,
streaming_encoding=cfg.dataset.streaming_encoding,
encoder_queue_maxsize=cfg.dataset.encoder_queue_maxsize,
+19 -10
View File
@@ -69,6 +69,7 @@ def hw_to_dataset_features(
for key, ftype in hw_features.items()
if ftype is float or (isinstance(ftype, PolicyFeature) and ftype.type != FeatureType.VISUAL)
}
# TODO(CarolinePascal): we should not rely on the shape to determine if a feature is a camera !
cam_fts = {key: shape for key, shape in hw_features.items() if isinstance(shape, tuple)}
if joint_fts and prefix == ACTION:
@@ -86,11 +87,19 @@ def hw_to_dataset_features(
}
for key, shape in cam_fts.items():
features[f"{prefix}.images.{key}"] = {
"dtype": "video" if use_video else "image",
"shape": shape,
"names": ["height", "width", "channels"],
}
dtype = "video" if use_video else "image"
if len(shape) == 3 and shape[2] in (1, 3):
features[f"{prefix}.images.{key}"] = {
"dtype": dtype,
"shape": shape,
"names": ["height", "width", "channels"],
"info": {"is_depth_map": shape[2] == 1},
}
else:
raise ValueError(
f"Camera feature '{key}' has shape {shape}. "
f"Expected a 3-tuple (H, W, C), e.g. (480, 640, 3) for RGB or (480, 640, 1) for depth."
)
_validate_feature_names(features)
return features
@@ -149,11 +158,11 @@ def dataset_to_policy_features(features: dict[str, dict]) -> dict[str, PolicyFea
type = FeatureType.VISUAL
if len(shape) != 3:
raise ValueError(f"Number of dimensions of {key} != 3 (shape={shape})")
names = ft["names"]
# Backward compatibility for "channel" which is an error introduced in LeRobotDataset v2.0 for ported datasets.
if names[2] in ["channel", "channels"]: # (h, w, c) -> (c, h, w)
shape = (shape[2], shape[0], shape[1])
else:
names = ft["names"]
# Backward compatibility for "channel" which is an error introduced in LeRobotDataset v2.0 for ported datasets.
if names[2] in ["channel", "channels"]: # (h, w, c) -> (c, h, w)
shape = (shape[2], shape[0], shape[1])
elif key == OBS_ENV_STATE:
type = FeatureType.ENV
elif key.startswith(OBS_STR):
+9 -2
View File
@@ -107,8 +107,15 @@ def log_rerun_data(
for i, vi in enumerate(arr):
rr.log(f"{key}_{i}", rr.Scalars(float(vi)))
else:
img_entity = rr.Image(arr).compress() if compress_images else rr.Image(arr)
rr.log(key, entity=img_entity, static=True)
if arr.shape[-1] == 1:
img_entity = (
rr.DepthImage(arr, colormap=rr.components.Colormap.Viridis).compress()
if compress_images
else rr.DepthImage(arr, colormap=rr.components.Colormap.Viridis)
)
else:
img_entity = rr.Image(arr).compress() if compress_images else rr.Image(arr)
rr.log(key, entity=img_entity)
if action:
for k, v in action.items():
+74 -8
View File
@@ -29,7 +29,10 @@ from lerobot.configs import VIDEO_ENCODER_INFO_KEYS
from lerobot.datasets.aggregate import aggregate_datasets
from lerobot.datasets.feature_utils import features_equal_for_merge
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from tests.fixtures.constants import DUMMY_REPO_ID
from tests.fixtures.constants import (
DUMMY_CAMERA_FEATURES_WITH_DEPTH,
DUMMY_REPO_ID,
)
def assert_episode_and_frame_counts(aggr_ds, expected_episodes, expected_frames):
@@ -191,6 +194,26 @@ def assert_dataset_iteration_works(aggr_ds):
pass
def assert_depth_keys_preserved(aggr_ds, ds_0, ds_1):
"""Test that depth keys are correctly preserved after aggregation.
Ensures that the ``is_depth_map`` marker on visual features survives
aggregation, so that downstream consumers (e.g. the dataset reader's
depth decoding path) keep working on the merged dataset.
"""
expected_depth_keys = set(ds_0.meta.depth_keys)
assert expected_depth_keys == set(ds_1.meta.depth_keys), (
"Source datasets disagree on depth_keys; test setup is inconsistent"
)
actual_depth_keys = set(aggr_ds.meta.depth_keys)
assert actual_depth_keys == expected_depth_keys, (
f"Expected depth_keys {expected_depth_keys}, got {actual_depth_keys}"
)
for key in expected_depth_keys:
info = aggr_ds.meta.info.features[key].get("info") or {}
assert info.get("is_depth_map") is True, f"Depth marker lost on feature {key!r} after aggregation"
def assert_video_timestamps_within_bounds(aggr_ds):
"""Test that all video timestamps are within valid bounds for their respective video files.
@@ -240,7 +263,11 @@ def assert_video_timestamps_within_bounds(aggr_ds):
def test_aggregate_datasets(tmp_path, lerobot_dataset_factory):
"""Test basic aggregation functionality with standard parameters."""
"""Test basic aggregation functionality with standard parameters.
Source datasets include both RGB and depth video features so the same
aggregation flow is exercised on the ``is_depth_map`` branch.
"""
ds_0_num_frames = 400
ds_1_num_frames = 800
ds_0_num_episodes = 10
@@ -252,14 +279,21 @@ def test_aggregate_datasets(tmp_path, lerobot_dataset_factory):
repo_id=f"{DUMMY_REPO_ID}_0",
total_episodes=ds_0_num_episodes,
total_frames=ds_0_num_frames,
camera_features=DUMMY_CAMERA_FEATURES_WITH_DEPTH,
)
ds_1 = lerobot_dataset_factory(
root=tmp_path / "test_1",
repo_id=f"{DUMMY_REPO_ID}_1",
total_episodes=ds_1_num_episodes,
total_frames=ds_1_num_frames,
camera_features=DUMMY_CAMERA_FEATURES_WITH_DEPTH,
)
# Confirm depth was actually wired into the source datasets so the
# rest of the assertions exercise the depth aggregation path.
assert len(ds_0.meta.depth_keys) > 0, "ds_0 should expose at least one depth key"
assert len(ds_1.meta.depth_keys) > 0, "ds_1 should expose at least one depth key"
aggregate_datasets(
repo_ids=[ds_0.repo_id, ds_1.repo_id],
roots=[ds_0.root, ds_1.root],
@@ -286,6 +320,7 @@ def test_aggregate_datasets(tmp_path, lerobot_dataset_factory):
assert_episode_indices_updated_correctly(aggr_ds, ds_0, ds_1)
assert_video_frames_integrity(aggr_ds, ds_0, ds_1)
assert_video_timestamps_within_bounds(aggr_ds)
assert_depth_keys_preserved(aggr_ds, ds_0, ds_1)
assert_dataset_iteration_works(aggr_ds)
@@ -357,7 +392,11 @@ def test_aggregate_incomplete_video_encoder_info_warns_and_nuls_encoders(
def test_aggregate_with_low_threshold(tmp_path, lerobot_dataset_factory):
"""Test aggregation with small file size limits to force file rotation/sharding."""
"""Test aggregation with small file size limits to force file rotation/sharding.
Depth video features are included to verify that file rotation/concat
correctly handles depth-marked features alongside regular RGB ones.
"""
ds_0_num_episodes = ds_1_num_episodes = 10
ds_0_num_frames = ds_1_num_frames = 400
@@ -366,14 +405,19 @@ def test_aggregate_with_low_threshold(tmp_path, lerobot_dataset_factory):
repo_id=f"{DUMMY_REPO_ID}_small_0",
total_episodes=ds_0_num_episodes,
total_frames=ds_0_num_frames,
camera_features=DUMMY_CAMERA_FEATURES_WITH_DEPTH,
)
ds_1 = lerobot_dataset_factory(
root=tmp_path / "small_1",
repo_id=f"{DUMMY_REPO_ID}_small_1",
total_episodes=ds_1_num_episodes,
total_frames=ds_1_num_frames,
camera_features=DUMMY_CAMERA_FEATURES_WITH_DEPTH,
)
assert len(ds_0.meta.depth_keys) > 0, "ds_0 should expose at least one depth key"
assert len(ds_1.meta.depth_keys) > 0, "ds_1 should expose at least one depth key"
# Use the new configurable parameters to force file rotation
aggregate_datasets(
repo_ids=[ds_0.repo_id, ds_1.repo_id],
@@ -404,6 +448,7 @@ def test_aggregate_with_low_threshold(tmp_path, lerobot_dataset_factory):
assert_episode_indices_updated_correctly(aggr_ds, ds_0, ds_1)
assert_video_frames_integrity(aggr_ds, ds_0, ds_1)
assert_video_timestamps_within_bounds(aggr_ds)
assert_depth_keys_preserved(aggr_ds, ds_0, ds_1)
assert_dataset_iteration_works(aggr_ds)
# Check that multiple files were actually created due to small size limits
@@ -423,7 +468,8 @@ def test_video_timestamps_regression(tmp_path, lerobot_dataset_factory):
"""Regression test for video timestamp bug when merging datasets.
This test specifically checks that video timestamps are correctly calculated
and accumulated when merging multiple datasets.
and accumulated when merging multiple datasets. Depth video features are
included so depth timestamps are also covered by the regression.
"""
datasets = []
for i in range(3):
@@ -432,9 +478,13 @@ def test_video_timestamps_regression(tmp_path, lerobot_dataset_factory):
repo_id=f"{DUMMY_REPO_ID}_regression_{i}",
total_episodes=2,
total_frames=100,
camera_features=DUMMY_CAMERA_FEATURES_WITH_DEPTH,
)
datasets.append(ds)
for i, ds in enumerate(datasets):
assert len(ds.meta.depth_keys) > 0, f"Dataset {i} should expose at least one depth key"
aggregate_datasets(
repo_ids=[ds.repo_id for ds in datasets],
roots=[ds.root for ds in datasets],
@@ -451,12 +501,21 @@ def test_video_timestamps_regression(tmp_path, lerobot_dataset_factory):
aggr_ds = LeRobotDataset(f"{DUMMY_REPO_ID}_regression_aggr", root=tmp_path / "regression_aggr")
assert_video_timestamps_within_bounds(aggr_ds)
# Depth keys must survive the merge for the regression to cover the
# ``is_depth_map`` decoding branch.
assert set(aggr_ds.meta.depth_keys) == set(datasets[0].meta.depth_keys)
depth_keys = set(aggr_ds.meta.depth_keys)
for i in range(len(aggr_ds)):
item = aggr_ds[i]
for key in aggr_ds.meta.video_keys:
assert key in item, f"Video key {key} missing from item {i}"
assert item[key].shape[0] == 3, f"Expected 3 channels for video key {key}"
# Depth frames are single-channel (1, H, W) after dequantization;
# standard RGB frames keep the 3-channel layout.
expected_channels = 1 if key in depth_keys else 3
assert item[key].shape[0] == expected_channels, (
f"Expected {expected_channels} channels for video key {key}, got {item[key].shape}"
)
def assert_image_schema_preserved(aggr_ds):
@@ -538,25 +597,31 @@ def test_aggregate_image_datasets(tmp_path, lerobot_dataset_factory):
ds_0_num_episodes = 2
ds_1_num_episodes = 3
# Create two image-based datasets (use_videos=False)
# Create two image-based datasets (use_videos=False) with a mix of RGB
# and depth-marked cameras so the depth path is exercised in image mode.
ds_0 = lerobot_dataset_factory(
root=tmp_path / "image_0",
repo_id=f"{DUMMY_REPO_ID}_image_0",
total_episodes=ds_0_num_episodes,
total_frames=ds_0_num_frames,
use_videos=False, # Image-based dataset
use_videos=False,
camera_features=DUMMY_CAMERA_FEATURES_WITH_DEPTH,
)
ds_1 = lerobot_dataset_factory(
root=tmp_path / "image_1",
repo_id=f"{DUMMY_REPO_ID}_image_1",
total_episodes=ds_1_num_episodes,
total_frames=ds_1_num_frames,
use_videos=False, # Image-based dataset
use_videos=False,
camera_features=DUMMY_CAMERA_FEATURES_WITH_DEPTH,
)
# Verify source datasets have image keys
assert len(ds_0.meta.image_keys) > 0, "ds_0 should have image keys"
assert len(ds_1.meta.image_keys) > 0, "ds_1 should have image keys"
# And that the depth marker actually made it onto an image feature.
assert len(ds_0.meta.depth_keys) > 0, "ds_0 should expose at least one depth key"
assert len(ds_1.meta.depth_keys) > 0, "ds_1 should expose at least one depth key"
# Aggregate the datasets
aggregate_datasets(
@@ -591,6 +656,7 @@ def test_aggregate_image_datasets(tmp_path, lerobot_dataset_factory):
# Image-specific assertions
assert_image_schema_preserved(aggr_ds)
assert_image_frames_integrity(aggr_ds, ds_0, ds_1)
assert_depth_keys_preserved(aggr_ds, ds_0, ds_1)
# Verify images can be accessed and have correct shape
sample_item = aggr_ds[0]
+45 -4
View File
@@ -59,11 +59,13 @@ def _make_dummy_stats(features: dict) -> dict:
stats = {}
for key, ft in features.items():
if ft["dtype"] in ("image", "video"):
channels = ft["shape"][-1]
stat_shape = (channels, 1, 1)
stats[key] = {
"max": np.ones((3, 1, 1), dtype=np.float32),
"mean": np.full((3, 1, 1), 0.5, dtype=np.float32),
"min": np.zeros((3, 1, 1), dtype=np.float32),
"std": np.full((3, 1, 1), 0.25, dtype=np.float32),
"max": np.ones(stat_shape, dtype=np.float32),
"mean": np.full(stat_shape, 0.5, dtype=np.float32),
"min": np.zeros(stat_shape, dtype=np.float32),
"std": np.full(stat_shape, 0.25, dtype=np.float32),
"count": np.array([5]),
}
elif ft["dtype"] in ("float32", "float64", "int64"):
@@ -142,6 +144,45 @@ def test_create_without_videos_has_no_video_path(tmp_path):
assert meta.video_keys == []
@pytest.mark.parametrize(
("marker_field", "marker_key"),
[
("info", "is_depth_map"),
("info", "video.is_depth_map"),
("video_info", "video.is_depth_map"),
],
ids=["info.is_depth_map", "info.video.is_depth_map_legacy", "video_info.video.is_depth_map_legacy"],
)
def test_depth_keys_property_filters_by_marker(tmp_path, marker_field, marker_key):
"""``depth_keys`` recognises the canonical and the two legacy marker variants."""
depth_feature = {
"dtype": "video",
"shape": (64, 96, 1),
"names": ["height", "width", "channels"],
marker_field: {marker_key: True},
}
features = {
**VIDEO_FEATURES,
"observation.images.laptop_depth": depth_feature,
}
meta = LeRobotDatasetMetadata.create(
repo_id="test/depth_keys",
fps=DEFAULT_FPS,
features=features,
root=tmp_path / f"depth_keys_{marker_field}_{marker_key.replace('.', '_')}",
)
assert set(meta.video_keys) == {"observation.images.laptop", "observation.images.laptop_depth"}
assert meta.depth_keys == ["observation.images.laptop_depth"]
def test_depth_keys_empty_when_no_marker(tmp_path):
meta = LeRobotDatasetMetadata.create(
repo_id="test/no_depth", fps=DEFAULT_FPS, features=VIDEO_FEATURES, root=tmp_path / "no_depth"
)
assert meta.depth_keys == []
def test_create_raises_on_existing_directory(tmp_path):
"""create() raises if root directory already exists."""
root = tmp_path / "existing"
+104 -4
View File
@@ -24,7 +24,7 @@ import torch
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
from lerobot.configs import VideoEncoderConfig
from lerobot.configs import DepthEncoderConfig, VideoEncoderConfig
from lerobot.datasets.dataset_tools import (
add_features,
convert_image_to_video_dataset,
@@ -37,7 +37,9 @@ from lerobot.datasets.dataset_tools import (
split_dataset,
)
from lerobot.datasets.io_utils import load_info
from tests.datasets.test_video_encoding import _add_frames, require_h264, require_libsvtav1
from tests.datasets.test_video_encoding import require_h264, require_hevc, require_libsvtav1
from tests.fixtures.constants import DUMMY_DEPTH_FEATURES, DUMMY_DEPTH_KEY
from tests.fixtures.dataset_factories import add_frames
@pytest.fixture
@@ -1332,9 +1334,107 @@ def test_convert_image_to_video_dataset_subset_episodes(tmp_path):
shutil.rmtree(output_dir)
@require_libsvtav1
@require_hevc
def test_convert_image_to_video_dataset_depth(tmp_path, empty_lerobot_dataset_factory):
"""Depth image features convert to depth videos using the depth encoder.
Mirrors :func:`test_convert_image_to_video_dataset` but with a small local
image dataset that mixes an RGB camera with a depth camera, so the
``depth_keys`` ``depth_encoder`` routing and ``is_depth_map`` preservation
are exercised end-to-end.
"""
features = {
"action": {"dtype": "float32", "shape": (2,), "names": ["a", "b"]},
"observation.images.cam": {
"dtype": "image",
"shape": (64, 96, 3),
"names": ["height", "width", "channels"],
},
"observation.images.depth": {
"dtype": "image",
"shape": (64, 96, 1),
"names": ["height", "width", "channels"],
"info": {"is_depth_map": True},
},
}
source_dataset = empty_lerobot_dataset_factory(
root=tmp_path / "img_ds",
features=features,
use_videos=False,
)
add_frames(source_dataset, num_frames=4)
source_dataset.save_episode()
source_dataset.finalize()
# Source is an image dataset with the depth marker on the depth camera.
assert len(source_dataset.meta.video_keys) == 0
assert "observation.images.depth" in source_dataset.meta.depth_keys
output_dir = tmp_path / "video_ds"
with (
patch("lerobot.datasets.dataset_metadata.get_safe_version") as mock_get_safe_version,
patch("lerobot.datasets.dataset_metadata.snapshot_download") as mock_snapshot_download,
):
mock_get_safe_version.return_value = "v3.0"
mock_snapshot_download.return_value = str(output_dir)
video_dataset = convert_image_to_video_dataset(
dataset=source_dataset,
output_dir=output_dir,
repo_id="dummy/depth_video",
camera_encoder=VideoEncoderConfig(vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30),
depth_encoder=DepthEncoderConfig(vcodec="hevc", pix_fmt="gray12le", g=2, crf=30),
num_workers=1,
)
# Both cameras are now videos, and the depth marker survived the conversion.
assert "observation.images.cam" in video_dataset.meta.video_keys
assert "observation.images.depth" in video_dataset.meta.video_keys
assert "observation.images.depth" in video_dataset.meta.depth_keys
assert "observation.images.cam" not in video_dataset.meta.depth_keys
depth_path = video_dataset.root / video_dataset.meta.get_video_file_path(0, "observation.images.depth")
assert depth_path.exists(), f"Depth video file should exist: {depth_path}"
# ─── reencode_dataset ─────────────────────────────────────────────────
@require_hevc
def test_reencode_dataset_depth_uses_depth_encoder(tmp_path, empty_lerobot_dataset_factory):
"""Depth videos are re-encoded with the depth encoder and keep their depth metadata.
Depth-focused companion to :func:`test_reencode_dataset_multi_key_multiprocessing`.
"""
initial_cfg = DepthEncoderConfig(vcodec="hevc", pix_fmt="gray12le", g=2, crf=30)
dataset = empty_lerobot_dataset_factory(
root=tmp_path / "ds",
features=DUMMY_DEPTH_FEATURES,
use_videos=True,
depth_encoder=initial_cfg,
)
add_frames(dataset, num_frames=4)
dataset.save_episode()
dataset.finalize()
assert DUMMY_DEPTH_KEY in dataset.meta.depth_keys
target_cfg = DepthEncoderConfig(vcodec="hevc", pix_fmt="gray12le", g=6, crf=23)
result = reencode_dataset(dataset, depth_encoder=target_cfg, num_workers=0)
assert result is dataset
persisted_info = load_info(dataset.root)
depth_info = persisted_info.features[DUMMY_DEPTH_KEY].get("info", {})
# Re-encode applied the new codec parameters to the depth video ...
assert DepthEncoderConfig.from_video_info(depth_info) == target_cfg
# ... while preserving the depth marker.
assert depth_info["is_depth_map"] is True
@require_libsvtav1
@require_h264
def test_reencode_dataset_multi_key_multiprocessing(
@@ -1350,9 +1450,9 @@ def test_reencode_dataset_multi_key_multiprocessing(
camera_encoder=initial_cfg,
)
_add_frames(dataset, num_frames=4)
add_frames(dataset, num_frames=4)
dataset.save_episode()
_add_frames(dataset, num_frames=4)
add_frames(dataset, num_frames=4)
dataset.save_episode()
dataset.finalize()
+7 -7
View File
@@ -53,8 +53,8 @@ def _make_frame(features: dict, task: str = "Dummy task") -> dict:
# ── Existing encode_video_worker tests ───────────────────────────────
def test_encode_video_worker_forwards_camera_encoder(tmp_path):
"""_encode_video_worker forwards camera_encoder to encode_video_frames."""
def test_encode_video_worker_forwards_video_encoder(tmp_path):
"""_encode_video_worker forwards video_encoder to encode_video_frames."""
video_key = "observation.images.laptop"
fpath = DEFAULT_IMAGE_PATH.format(image_key=video_key, episode_index=0, frame_index=0)
img_dir = tmp_path / Path(fpath).parent
@@ -74,16 +74,16 @@ def test_encode_video_worker_forwards_camera_encoder(tmp_path):
0,
tmp_path,
fps=30,
camera_encoder=VideoEncoderConfig(vcodec="h264", preset=None),
video_encoder=VideoEncoderConfig(vcodec="h264", preset=None),
encoder_threads=4,
)
assert captured_kwargs["camera_encoder"].vcodec == "h264"
assert captured_kwargs["video_encoder"].vcodec == "h264"
assert captured_kwargs["encoder_threads"] == 4
def test_encode_video_worker_default_camera_encoder(tmp_path):
"""_encode_video_worker passes None camera_encoder which encode_video_frames defaults."""
def test_encode_video_worker_default_video_encoder(tmp_path):
"""_encode_video_worker passes None video_encoder which encode_video_frames defaults."""
video_key = "observation.images.laptop"
fpath = DEFAULT_IMAGE_PATH.format(image_key=video_key, episode_index=0, frame_index=0)
img_dir = tmp_path / Path(fpath).parent
@@ -100,7 +100,7 @@ def test_encode_video_worker_default_camera_encoder(tmp_path):
with patch("lerobot.datasets.dataset_writer.encode_video_frames", side_effect=mock_encode):
_encode_video_worker(video_key, 0, tmp_path, fps=30)
assert captured_kwargs["camera_encoder"] is None
assert captured_kwargs["video_encoder"] is None
assert captured_kwargs["encoder_threads"] is None
+6 -1
View File
@@ -1516,10 +1516,15 @@ def test_valid_video_codecs_constant():
assert "h264" in VALID_VIDEO_CODECS
assert "hevc" in VALID_VIDEO_CODECS
assert "libsvtav1" in VALID_VIDEO_CODECS
assert "ffv1" in VALID_VIDEO_CODECS
assert "auto" in VALID_VIDEO_CODECS
assert "h264_videotoolbox" in VALID_VIDEO_CODECS
assert "h264_nvenc" in VALID_VIDEO_CODECS
assert len(VALID_VIDEO_CODECS) == 10
assert "h264_vaapi" in VALID_VIDEO_CODECS
assert "h264_qsv" in VALID_VIDEO_CODECS
assert "hevc_videotoolbox" in VALID_VIDEO_CODECS
assert "hevc_nvenc" in VALID_VIDEO_CODECS
assert len(VALID_VIDEO_CODECS) == 11
def test_delta_timestamps_with_episodes_filter(tmp_path, empty_lerobot_dataset_factory):
+242
View File
@@ -0,0 +1,242 @@
"""Tests for the depth-integration feature.
Covers:
- ``depth_utils`` quantize/dequantize round-trips and backend agreement.
- Image-writer support for single-channel depth.
- Hardware-feature depth flag routing.
- Feature-to-file-format routing through the dataset writer.
Depth metadata detection on ``LeRobotDatasetMetadata.depth_keys`` lives in
``test_dataset_metadata.py``. Depth video encoding/decoding lives in
``test_video_encoding.py``.
"""
from pathlib import Path
import pytest
pytest.importorskip("av", reason="av is required (install lerobot[dataset])")
import av
import numpy as np
import PIL.Image
import pytest
import torch
from lerobot.configs import DepthEncoderConfig
from lerobot.configs.video import DEFAULT_DEPTH_MAX, DEFAULT_DEPTH_MIN, DEPTH_QMAX
from lerobot.datasets.depth_utils import dequantize_depth, quantize_depth
from lerobot.datasets.image_writer import image_array_to_pil_image, write_image
from tests.fixtures.constants import (
DEFAULT_FPS,
DUMMY_CAMERA_FEATURES,
DUMMY_CAMERA_FEATURES_WITH_DEPTH,
DUMMY_CHW,
DUMMY_DEPTH_CAMERA_FEATURES,
DUMMY_REPO_ID,
)
from tests.fixtures.dataset_factories import add_frames
_, H, W = DUMMY_CHW
def _depth_metres_ramp() -> np.ndarray:
"""Linearly-spaced float32 depth in metres covering the default range."""
return np.linspace(DEFAULT_DEPTH_MIN, DEFAULT_DEPTH_MAX, H * W, dtype=np.float32).reshape(H, W)
# ── 1. Quantize / dequantize round-trips ──────────────────────────────
class TestQuantizeDequantize:
"""Numerical contract of ``quantize_depth`` / ``dequantize_depth``."""
@pytest.mark.parametrize("use_log", [False, True])
@pytest.mark.parametrize("output_unit", ["m", "mm"])
@pytest.mark.parametrize("output_channel_last", [False, True])
def test_roundtrip(self, use_log, output_unit, output_channel_last):
"""quantize → dequantize recovers depth; layout and unit are honored."""
depth = _depth_metres_ramp()
quantized = quantize_depth(depth, use_log=use_log, video_backend=None)
recovered = dequantize_depth(
quantized,
use_log=use_log,
output_unit=output_unit,
output_tensor=False,
output_channel_last=output_channel_last,
)
expected_shape = (H, W, 1) if output_channel_last else (1, H, W)
assert recovered.shape == expected_shape
recovered_m = recovered.astype(np.float32)
if output_unit == "mm":
recovered_m = recovered_m / 1000.0
recovered_2d = recovered_m[..., 0] if output_channel_last else recovered_m[0]
if use_log:
# Log mode: tighter near-range error than far-range (the whole point).
near = depth < 1.0
far = depth > 8.0
err_near = np.abs(recovered_2d[near] - depth[near])
err_far = np.abs(recovered_2d[far] - depth[far])
assert err_near.mean() < err_far.mean()
else:
# Linear mode: bounded by quant step + 1 mm of unit-conversion rounding.
tol = (DEFAULT_DEPTH_MAX - DEFAULT_DEPTH_MIN) / DEPTH_QMAX + 1e-3
np.testing.assert_allclose(recovered_2d, depth, atol=tol)
@pytest.mark.parametrize("use_log", [False, True])
@pytest.mark.parametrize("output_unit", ["m", "mm"])
def test_numpy_torch_agree(self, use_log, output_unit):
"""Batched torch path produces the same values as the numpy path."""
batch_size = 3
per_frame = np.linspace(0, DEPTH_QMAX, H * W, dtype=np.uint16).reshape(H, W)
batch_np = np.broadcast_to(per_frame[None, None, ...], (batch_size, 1, H, W)).copy()
batch_t = torch.from_numpy(batch_np.astype(np.int32)) # torch.uint16 support is patchy.
ref = dequantize_depth(batch_np, use_log=use_log, output_unit=output_unit, output_tensor=False)
out = dequantize_depth(batch_t, use_log=use_log, output_unit=output_unit, output_tensor=True)
assert isinstance(out, torch.Tensor)
assert out.shape == (batch_size, 1, H, W)
# ``m``: float32 noise (~10 µm in log mode, after ``exp``) — still 200× below the ~2 mm quant step.
# ``mm`` + tensor stays in float32 (no uint16 round-trip), so allow 1 mm slop.
atol = 1e-5 if output_unit == "m" else 1.0
np.testing.assert_allclose(out.cpu().numpy().astype(np.float64), ref.astype(np.float64), atol=atol)
@pytest.mark.parametrize(
"input_shape,output_shape",
[
((H, W), (1, H, W)),
((1, H, W), (1, H, W)),
((H, W, 1), (1, H, W)),
((3, 1, H, W), (3, 1, H, W)),
((3, H, W, 1), (3, 1, H, W)),
],
)
def test_input_layouts_accepted(self, input_shape, output_shape):
"""All documented input layouts decode to the channel-first default."""
quantized = np.full(input_shape, DEPTH_QMAX // 2, dtype=np.uint16)
out = dequantize_depth(quantized, output_unit="m", output_tensor=False)
assert out.shape == output_shape
def test_pyav_frame_roundtrip(self):
"""quantize → av.VideoFrame → dequantize works."""
depth = _depth_metres_ramp()
frame = quantize_depth(depth, use_log=False, video_backend="pyav")
assert isinstance(frame, av.VideoFrame)
recovered = dequantize_depth(frame, use_log=False, output_unit="m", output_tensor=False)
assert recovered.shape == (1, H, W)
tol = (DEFAULT_DEPTH_MAX - DEFAULT_DEPTH_MIN) / DEPTH_QMAX + 1e-3
np.testing.assert_allclose(recovered[0], depth, atol=tol)
def test_invalid_log_params_raises(self):
with pytest.raises(ValueError, match=r"depth_min \+ shift must be positive"):
quantize_depth(_depth_metres_ramp(), depth_min=1.0, shift=-2.0, use_log=True, video_backend=None)
# ── 2. Image writer depth support ─────────────────────────────────────
class TestImageWriterDepth:
"""``image_array_to_pil_image`` and ``write_image`` for depth maps."""
@pytest.mark.parametrize("dtype,expected_mode", [(np.uint16, "I;16"), (np.float32, "F")])
@pytest.mark.parametrize("shape", [(H, W), (H, W, 1), (1, H, W)])
def test_pil_depth_modes_and_squeeze(self, dtype, expected_mode, shape):
"""Single-channel depth converts to PIL with the right mode and (W, H) size."""
arr = np.zeros(shape, dtype=dtype)
img = image_array_to_pil_image(arr)
assert img.mode == expected_mode
assert img.size == (W, H)
def test_write_image_tiff_roundtrip(self, tmp_path):
"""uint16 depth round-trips through .tiff."""
arr = np.arange(H * W, dtype=np.uint16).reshape(H, W)
fpath = tmp_path / "depth.tiff"
write_image(arr, fpath)
with PIL.Image.open(fpath) as loaded:
recovered = np.array(loaded)
np.testing.assert_array_equal(recovered, arr)
# ── 3. Hardware-feature → depth flag ──────────────────────────────────
class TestHwToDatasetFeaturesDepth:
"""``hw_to_dataset_features`` flags single-channel cameras as depth."""
@pytest.mark.parametrize("channels,is_depth", [(1, True), (3, False)])
def test_depth_marker_by_channels(self, channels, is_depth):
from lerobot.utils.feature_utils import hw_to_dataset_features
features = hw_to_dataset_features({"cam": (480, 640, channels)}, prefix="observation")
assert features["observation.images.cam"]["info"]["is_depth_map"] is is_depth
def test_invalid_channel_count_raises(self):
from lerobot.utils.feature_utils import hw_to_dataset_features
with pytest.raises(ValueError, match="Expected a 3-tuple"):
hw_to_dataset_features({"cam": (480, 640, 2)}, prefix="observation")
# ── 4. Feature-to-file-format routing ────────────────────────────────
# Keys derived from DUMMY_CAMERA_FEATURES_WITH_DEPTH; pick one RGB and the depth camera.
RGB_KEY = next(iter(DUMMY_CAMERA_FEATURES))
DEPTH_KEY = next(iter(DUMMY_DEPTH_CAMERA_FEATURES))
class TestFeatureFileRouting:
"""Depth vs RGB features route to the correct file format."""
NUM_FRAMES = 5
def test_image_mode_depth_tiff_rgb_png(self, tmp_path, features_factory):
"""Without video encoding: depth → .tiff, RGB → .png."""
from lerobot.datasets.lerobot_dataset import LeRobotDataset
features = features_factory(camera_features=DUMMY_CAMERA_FEATURES_WITH_DEPTH, use_videos=False)
dataset = LeRobotDataset.create(
repo_id=DUMMY_REPO_ID,
fps=DEFAULT_FPS,
features=features,
root=tmp_path / "ds",
use_videos=False,
)
add_frames(dataset, num_frames=self.NUM_FRAMES)
buf = dataset.writer.episode_buffer
assert all(Path(p).suffix == ".tiff" for p in buf[DEPTH_KEY])
assert all(Path(p).suffix == ".png" for p in buf[RGB_KEY])
dataset.save_episode()
dataset.finalize()
def test_video_mode_depth_uses_depth_encoder(self, tmp_path, features_factory):
"""With streaming video encoding: depth → DepthEncoderConfig, RGB does not."""
from lerobot.datasets.lerobot_dataset import LeRobotDataset
features = features_factory(camera_features=DUMMY_CAMERA_FEATURES_WITH_DEPTH, use_videos=True)
dataset = LeRobotDataset.create(
repo_id=DUMMY_REPO_ID,
fps=DEFAULT_FPS,
features=features,
root=tmp_path / "ds",
use_videos=True,
streaming_encoding=True,
)
add_frames(dataset, num_frames=self.NUM_FRAMES)
encoder = dataset.writer._streaming_encoder
assert encoder is not None
assert isinstance(encoder._threads[DEPTH_KEY].video_encoder, DepthEncoderConfig)
assert not isinstance(encoder._threads[RGB_KEY].video_encoder, DepthEncoderConfig)
dataset.save_episode()
dataset.finalize()
+1 -1
View File
@@ -94,7 +94,7 @@ def test_image_array_to_pil_image_pytorch_format(img_array_factory):
def test_image_array_to_pil_image_single_channel(img_array_factory):
img_array = img_array_factory(channels=1)
with pytest.raises(NotImplementedError):
with pytest.raises(ValueError, match="Unsupported single-channel image dtype"):
image_array_to_pil_image(img_array)
+5 -10
View File
@@ -61,9 +61,7 @@ class TestCameraEncoderThread:
encoder_thread = _CameraEncoderThread(
video_path=video_path,
fps=fps,
vcodec=enc_cfg.vcodec,
pix_fmt=enc_cfg.pix_fmt,
codec_options=enc_cfg.get_codec_options(as_strings=True),
video_encoder=enc_cfg,
frame_queue=frame_queue,
result_queue=result_queue,
stop_event=stop_event,
@@ -112,9 +110,7 @@ class TestCameraEncoderThread:
encoder_thread = _CameraEncoderThread(
video_path=video_path,
fps=fps,
vcodec=enc_cfg.vcodec,
pix_fmt=enc_cfg.pix_fmt,
codec_options=enc_cfg.get_codec_options(as_strings=True),
video_encoder=enc_cfg,
frame_queue=frame_queue,
result_queue=result_queue,
stop_event=stop_event,
@@ -146,9 +142,7 @@ class TestCameraEncoderThread:
encoder_thread = _CameraEncoderThread(
video_path=video_path,
fps=fps,
vcodec=enc_cfg.vcodec,
pix_fmt=enc_cfg.pix_fmt,
codec_options=enc_cfg.get_codec_options(as_strings=True),
video_encoder=enc_cfg,
frame_queue=frame_queue,
result_queue=result_queue,
stop_event=stop_event,
@@ -391,7 +385,8 @@ class TestStreamingVideoEncoder:
# Verify codec options include thread tuning for libsvtav1 (lp=…)
thread = encoder._threads[f"{OBS_IMAGES}.cam"]
assert "svtav1-params" in thread.codec_options or "threads" in thread.codec_options
codec_opts = thread.video_encoder.get_codec_options(encoder_threads=thread.encoder_threads)
assert "svtav1-params" in codec_opts or "threads" in codec_opts
# Feed some frames and finish to ensure it works end-to-end
num_frames = 10
+301 -72
View File
@@ -26,7 +26,7 @@ pytest.importorskip("av", reason="av is required (install lerobot[dataset])")
import av # noqa: E402
from lerobot.configs import VALID_VIDEO_CODECS, VideoEncoderConfig
from lerobot.configs import VALID_VIDEO_CODECS, DepthEncoderConfig, VideoEncoderConfig
from lerobot.datasets.image_writer import write_image
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pyav_utils import get_codec
@@ -37,7 +37,15 @@ from lerobot.datasets.video_utils import (
get_video_info,
reencode_video,
)
from tests.fixtures.constants import DUMMY_VIDEO_INFO
from tests.fixtures.constants import (
DUMMY_DEPTH_FEATURES,
DUMMY_DEPTH_KEY,
DUMMY_DEPTH_VIDEO_INFO_FULL,
DUMMY_VIDEO_FEATURES,
DUMMY_VIDEO_INFO,
DUMMY_VIDEO_KEY,
)
from tests.fixtures.dataset_factories import add_frames
# Per-codec skip markers — validation tests only fire when the codec is available
@@ -48,12 +56,67 @@ def _require_encoder(vcodec: str) -> pytest.MarkDecorator:
require_libsvtav1 = _require_encoder("libsvtav1")
require_h264 = _require_encoder("h264")
require_hevc = _require_encoder("hevc")
require_videotoolbox = _require_encoder("h264_videotoolbox")
require_nvenc = _require_encoder("h264_nvenc")
require_vaapi = _require_encoder("h264_vaapi")
require_qsv = _require_encoder("h264_qsv")
TEST_ARTIFACTS_DIR = Path(__file__).parent.parent / "artifacts" / "encoded_videos"
def _write_color_frames(imgs_dir: Path, num_frames: int = 4, height: int = 64, width: int = 96) -> None:
imgs_dir.mkdir(parents=True, exist_ok=True)
for i in range(num_frames):
arr = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8)
write_image(arr, imgs_dir / f"frame-{i:06d}.png")
def _write_depth_frames(imgs_dir: Path, num_frames: int = 4, height: int = 64, width: int = 96) -> None:
"""Write synthetic uint16 depth TIFFs (millimetres) for depth encoder tests.
Uses a smooth linear ramp + per-frame offset (not white noise) so HEVC Main 12
on ``gray12le`` compresses well. Values span ~100 mm to 10 m, covering most
of the default ``[DEPTH_MIN, DEPTH_MAX]`` metres range after
``quantize_depth(input_unit="auto"="mm")``.
"""
imgs_dir.mkdir(parents=True, exist_ok=True)
base = np.linspace(100.0, 10_000.0, height * width, dtype=np.float32).reshape(height, width)
for i in range(num_frames):
arr = (base + 50.0 * i).clip(0, 65535).astype(np.uint16)
write_image(arr, imgs_dir / f"frame-{i:06d}.tiff")
def _encode_video(
path: Path,
num_frames: int = 4,
fps: int = 30,
cfg: VideoEncoderConfig | None = None,
depth: bool = False,
) -> Path:
"""Write synthetic frames to a temp dir and encode them to ``path``.
``depth=False`` writes uint8 RGB PNG noise and encodes with ``cfg``
(defaulting to the library default). ``depth=True`` writes synthetic uint16
depth TIFFs and encodes with ``cfg`` or a default :class:`DepthEncoderConfig`
(HEVC Main 12 / ``gray12le``).
"""
imgs_dir = path.parent / f"imgs_{path.stem}"
if depth:
_write_depth_frames(imgs_dir, num_frames=num_frames)
cfg = cfg or DepthEncoderConfig()
else:
_write_color_frames(imgs_dir, num_frames=num_frames)
encode_video_frames(imgs_dir, path, fps=fps, video_encoder=cfg, overwrite=True)
return path
def _read_feature_info(dataset: LeRobotDataset, key: str = DUMMY_VIDEO_KEY) -> dict:
info = json.loads((dataset.root / INFO_PATH).read_text())
return info["features"][key]["info"]
# ─── VideoEncoderConfig / codec options ──────────────────────────────
@@ -87,7 +150,7 @@ class TestCodecOptions:
assert opts["q:v"] == 40
assert "crf" not in opts
@_require_encoder("h264_nvenc")
@require_nvenc
def test_nvenc_options(self):
cfg = VideoEncoderConfig(vcodec="h264_nvenc", g=2, crf=25, preset=None)
opts = cfg.get_codec_options()
@@ -96,12 +159,12 @@ class TestCodecOptions:
assert "crf" not in opts
assert opts["g"] == 2
@_require_encoder("h264_vaapi")
@require_vaapi
def test_vaapi_options(self):
cfg = VideoEncoderConfig(vcodec="h264_vaapi", crf=28, preset=None)
assert cfg.get_codec_options()["qp"] == 28
@_require_encoder("h264_qsv")
@require_qsv
def test_qsv_options(self):
cfg = VideoEncoderConfig(vcodec="h264_qsv", crf=25, preset=None)
assert cfg.get_codec_options()["global_quality"] == 25
@@ -313,59 +376,6 @@ class TestEncoderDetection:
assert "h264_nvenc" in VALID_VIDEO_CODECS
TEST_ARTIFACTS_DIR = Path(__file__).parent.parent / "artifacts" / "encoded_videos"
# Default video feature set used by persistence tests.
VIDEO_FEATURES = {
"observation.images.cam": {
"dtype": "video",
"shape": (64, 96, 3),
"names": ["height", "width", "channels"],
},
"action": {"dtype": "float32", "shape": (2,), "names": ["a", "b"]},
}
VIDEO_KEY = "observation.images.cam"
def _write_frames(imgs_dir: Path, num_frames: int = 4, height: int = 64, width: int = 96) -> None:
imgs_dir.mkdir(parents=True, exist_ok=True)
for i in range(num_frames):
arr = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8)
write_image(arr, imgs_dir / f"frame-{i:06d}.png")
def _encode_video(
path: Path, num_frames: int = 4, fps: int = 30, cfg: VideoEncoderConfig | None = None
) -> Path:
imgs_dir = path.parent / f"imgs_{path.stem}"
_write_frames(imgs_dir, num_frames=num_frames)
encode_video_frames(imgs_dir, path, fps=fps, camera_encoder=cfg, overwrite=True)
return path
def _read_feature_info(dataset: LeRobotDataset) -> dict:
info = json.loads((dataset.root / INFO_PATH).read_text())
return info["features"][VIDEO_KEY]["info"]
def _add_frames(dataset: LeRobotDataset, num_frames: int, video_keys: list[str] | None = None) -> None:
from lerobot.utils.constants import DEFAULT_FEATURES
if video_keys is None:
video_keys = dataset.meta.video_keys
for _ in range(num_frames):
frame: dict = {"task": "test"}
for key, ft in dataset.meta.features.items():
if key in DEFAULT_FEATURES:
continue
shape = ft["shape"]
if key in video_keys:
frame[key] = np.random.randint(0, 256, shape, dtype=np.uint8)
else:
frame[key] = np.zeros(shape, dtype=np.float32)
dataset.add_frame(frame)
class TestGetVideoInfo:
def test_returns_all_stream_fields(self):
info = get_video_info(TEST_ARTIFACTS_DIR / "clip_4frames.mp4")
@@ -375,7 +385,7 @@ class TestGetVideoInfo:
assert info["video.pix_fmt"] == "yuv420p"
assert info["video.fps"] == 30
assert info["video.channels"] == 3
assert info["video.is_depth_map"] is False
assert info["is_depth_map"] is False
assert info["has_audio"] is False
assert "video.g" not in info
assert "video.crf" not in info
@@ -385,7 +395,7 @@ class TestGetVideoInfo:
def test_merges_encoder_config_as_video_prefixed_entries(self):
cfg = VideoEncoderConfig(vcodec="libsvtav1", g=2, crf=30, preset=12)
info = get_video_info(TEST_ARTIFACTS_DIR / "clip_4frames.mp4", camera_encoder=cfg)
info = get_video_info(TEST_ARTIFACTS_DIR / "clip_4frames.mp4", video_encoder=cfg)
assert info["video.g"] == 2
assert info["video.crf"] == 30
@@ -398,11 +408,16 @@ class TestGetVideoInfo:
def test_stream_derived_keys_take_precedence_over_config(self):
cfg = VideoEncoderConfig(vcodec="libsvtav1", pix_fmt="yuv420p")
info = get_video_info(TEST_ARTIFACTS_DIR / "clip_4frames.mp4", camera_encoder=cfg)
info = get_video_info(TEST_ARTIFACTS_DIR / "clip_4frames.mp4", video_encoder=cfg)
assert info["video.codec"] # populated from stream, not from config's vcodec
assert info["video.pix_fmt"] == "yuv420p"
def test_depth_encoder_config_sets_is_depth_map_true(self):
"""A ``DepthEncoderConfig`` causes ``get_video_info`` to mark the stream as depth."""
info = get_video_info(TEST_ARTIFACTS_DIR / "clip_4frames.mp4", video_encoder=DepthEncoderConfig())
assert info["is_depth_map"] is True
class TestEncodeVideoFrames:
@require_libsvtav1
@@ -434,7 +449,7 @@ class TestEncodeVideoFrames:
def test_overwrite_false_skips_existing_file(self, tmp_path):
imgs_dir = tmp_path / "imgs"
_write_frames(imgs_dir)
_write_color_frames(imgs_dir)
video_path = tmp_path / "out.mp4"
sentinel = b"pre-existing content"
video_path.write_bytes(sentinel)
@@ -446,7 +461,7 @@ class TestEncodeVideoFrames:
@require_libsvtav1
def test_overwrite_true_replaces_existing_file(self, tmp_path):
imgs_dir = tmp_path / "imgs"
_write_frames(imgs_dir)
_write_color_frames(imgs_dir)
video_path = tmp_path / "out.mp4"
video_path.write_bytes(b"stale content")
@@ -461,7 +476,7 @@ class TestEncodeVideoFrames:
cfg = VideoEncoderConfig(vcodec="libsvtav1", g=4, crf=25, preset=10)
video_path = _encode_video(tmp_path / "out.mp4", num_frames=4, fps=30, cfg=cfg)
info = get_video_info(video_path, camera_encoder=cfg)
info = get_video_info(video_path, video_encoder=cfg)
# Stream-derived
assert info["video.height"] == 64
@@ -470,7 +485,7 @@ class TestEncodeVideoFrames:
assert info["video.codec"] == "av1"
assert info["video.pix_fmt"] == "yuv420p"
assert info["video.fps"] == 30
assert info["video.is_depth_map"] is False
assert info["is_depth_map"] is False
assert info["has_audio"] is False
# Encoder config
assert info["video.g"] == 4
@@ -488,14 +503,14 @@ class TestReencodeVideo:
src = TEST_ARTIFACTS_DIR / "clip_4frames.mp4"
out = tmp_path / "reencoded.mp4"
cfg = VideoEncoderConfig(vcodec="h264", g=6, crf=23, pix_fmt="yuv444p")
reencode_video(src, out, camera_encoder=cfg, overwrite=True)
reencode_video(src, out, video_encoder=cfg, overwrite=True)
assert out.exists()
with av.open(str(out)) as container:
n_frames = sum(1 for _ in container.decode(video=0))
assert n_frames == 4
info = get_video_info(out, camera_encoder=cfg)
info = get_video_info(out, video_encoder=cfg)
assert info["video.codec"] == "h264"
assert info["video.pix_fmt"] == "yuv444p"
assert info["video.height"] == 64
@@ -567,10 +582,10 @@ class TestEncoderConfigPersistence:
def test_first_episode_save_persists_encoder_config(self, tmp_path, empty_lerobot_dataset_factory):
cfg = VideoEncoderConfig(vcodec="libsvtav1", g=2, crf=30, preset=12)
dataset = empty_lerobot_dataset_factory(
root=tmp_path / "ds", features=VIDEO_FEATURES, use_videos=True, camera_encoder=cfg
root=tmp_path / "ds", features=DUMMY_VIDEO_FEATURES, use_videos=True, camera_encoder=cfg
)
_add_frames(dataset, num_frames=4)
add_frames(dataset, num_frames=4)
dataset.save_episode()
dataset.finalize()
@@ -590,14 +605,14 @@ class TestEncoderConfigPersistence:
def test_second_episode_does_not_overwrite_encoder_fields(self, tmp_path, empty_lerobot_dataset_factory):
cfg = VideoEncoderConfig(vcodec="libsvtav1", g=2, crf=30, preset=12)
dataset = empty_lerobot_dataset_factory(
root=tmp_path / "ds", features=VIDEO_FEATURES, use_videos=True, camera_encoder=cfg
root=tmp_path / "ds", features=DUMMY_VIDEO_FEATURES, use_videos=True, camera_encoder=cfg
)
_add_frames(dataset, num_frames=4)
add_frames(dataset, num_frames=4)
dataset.save_episode()
first_info = dict(_read_feature_info(dataset))
_add_frames(dataset, num_frames=4)
add_frames(dataset, num_frames=4)
dataset.save_episode()
dataset.finalize()
@@ -624,3 +639,217 @@ class TestFromVideoInfo:
# ``{}`` placeholder (typical after a merge with disagreeing sources)
# must not leak into the reconstructed config.
assert cfg.extra_options == VideoEncoderConfig().extra_options
# ─── Depth-specific encoding tests ────────────────────────────────────
class TestEncodeDepthVideoFrames:
"""Depth mirror of :class:`TestEncodeVideoFrames`.
Exercises ``encode_video_frames`` end-to-end through
:class:`DepthEncoderConfig` (HEVC Main 12 / ``gray12le``) on synthetic
uint16 depth TIFFs.
"""
@require_hevc
def test_produces_readable_file(self, tmp_path):
video_path = _encode_video(tmp_path / "out.mp4", depth=True)
assert video_path.exists()
info = get_video_info(video_path, video_encoder=DepthEncoderConfig())
assert info["video.height"] == 64
assert info["video.width"] == 96
assert info["video.codec"] == "hevc"
assert info["video.pix_fmt"] == "gray12le"
assert info["video.channels"] == 1
assert info["is_depth_map"] is True
@require_hevc
def test_frame_count_and_duration_match_input(self, tmp_path):
num_frames = 10
fps = 30
video_path = _encode_video(tmp_path / "out.mp4", num_frames=num_frames, fps=fps, depth=True)
with av.open(str(video_path)) as container:
stream = container.streams.video[0]
actual_frames = sum(1 for _ in container.decode(stream))
duration = (
float(stream.duration * stream.time_base)
if stream.duration is not None
else float(container.duration / av.time_base)
)
assert actual_frames == num_frames
assert abs(duration - num_frames / fps) < 0.1
def test_overwrite_false_skips_existing_file(self, tmp_path):
"""Codec-agnostic: file-system semantics must hold even without an HEVC encoder."""
imgs_dir = tmp_path / "imgs"
_write_depth_frames(imgs_dir)
video_path = tmp_path / "out.mp4"
sentinel = b"pre-existing depth content"
video_path.write_bytes(sentinel)
encode_video_frames(imgs_dir, video_path, fps=30, video_encoder=DepthEncoderConfig(), overwrite=False)
assert video_path.read_bytes() == sentinel
@require_hevc
def test_overwrite_true_replaces_existing_file(self, tmp_path):
imgs_dir = tmp_path / "imgs"
_write_depth_frames(imgs_dir)
video_path = tmp_path / "out.mp4"
video_path.write_bytes(b"stale content")
encode_video_frames(imgs_dir, video_path, fps=30, video_encoder=DepthEncoderConfig(), overwrite=True)
info = get_video_info(video_path, video_encoder=DepthEncoderConfig())
assert info["video.height"] == 64
assert info["video.pix_fmt"] == "gray12le"
assert info["is_depth_map"] is True
@require_hevc
def test_custom_encoder_config_fields_stored_in_info(self, tmp_path):
"""All stream-derived and depth-encoder config fields are present after encoding."""
cfg = DepthEncoderConfig(
vcodec="hevc",
pix_fmt="gray12le",
g=4,
crf=25,
depth_min=0.05,
depth_max=8.0,
shift=2.5,
use_log=False,
)
video_path = _encode_video(tmp_path / "out.mp4", num_frames=4, fps=30, cfg=cfg, depth=True)
info = get_video_info(video_path, video_encoder=cfg)
# Stream-derived
assert info["video.height"] == 64
assert info["video.width"] == 96
assert info["video.channels"] == 1
assert info["video.codec"] == "hevc"
assert info["video.pix_fmt"] == "gray12le"
assert info["video.fps"] == 30
assert info["is_depth_map"] is True
assert info["has_audio"] is False
# Base encoder config
assert info["video.g"] == 4
assert info["video.crf"] == 25
assert info["video.fast_decode"] == 0
assert info["video.video_backend"] == "pyav"
assert info["video.extra_options"] == {}
# Depth-specific tuning
assert info["video.depth_min"] == 0.05
assert info["video.depth_max"] == 8.0
assert info["video.shift"] == 2.5
assert info["video.use_log"] is False
class TestDepthEncoderConfigPersistence:
"""Depth mirror of :class:`TestEncoderConfigPersistence`.
``DepthEncoderConfig`` must be stored as ``video.<field>`` entries
(including the depth-specific ``depth_min`` / ``depth_max`` / ``shift`` /
``use_log``) under ``info["features"][<depth_key>]["info"]`` when the
first episode is saved.
"""
@require_hevc
def test_first_episode_save_persists_depth_encoder_config(self, tmp_path, empty_lerobot_dataset_factory):
cfg = DepthEncoderConfig(
vcodec="hevc",
pix_fmt="gray12le",
g=2,
crf=30,
depth_min=0.05,
depth_max=8.0,
shift=2.5,
use_log=False,
)
dataset = empty_lerobot_dataset_factory(
root=tmp_path / "ds", features=DUMMY_DEPTH_FEATURES, use_videos=True, depth_encoder=cfg
)
add_frames(dataset, num_frames=4)
dataset.save_episode()
dataset.finalize()
info = _read_feature_info(dataset, key=DUMMY_DEPTH_KEY)
# Stream-derived
assert info["video.height"] == 64
assert info["video.width"] == 96
assert info["video.fps"] == 30
assert info["video.codec"] == "hevc"
assert info["video.pix_fmt"] == "gray12le"
assert info["is_depth_map"] is True
# Base encoder config
assert info["video.g"] == 2
assert info["video.crf"] == 30
assert info["video.fast_decode"] == 0
assert info["video.video_backend"] == "pyav"
assert info["video.extra_options"] == {}
# Depth-specific tuning
assert info["video.depth_min"] == 0.05
assert info["video.depth_max"] == 8.0
assert info["video.shift"] == 2.5
assert info["video.use_log"] is False
@require_hevc
def test_second_episode_does_not_overwrite_depth_encoder_fields(
self, tmp_path, empty_lerobot_dataset_factory
):
cfg = DepthEncoderConfig(
vcodec="hevc",
pix_fmt="gray12le",
g=2,
crf=30,
depth_min=0.05,
depth_max=8.0,
shift=2.5,
use_log=False,
)
dataset = empty_lerobot_dataset_factory(
root=tmp_path / "ds", features=DUMMY_DEPTH_FEATURES, use_videos=True, depth_encoder=cfg
)
add_frames(dataset, num_frames=4)
dataset.save_episode()
first_info = dict(_read_feature_info(dataset, key=DUMMY_DEPTH_KEY))
add_frames(dataset, num_frames=4)
dataset.save_episode()
dataset.finalize()
assert _read_feature_info(dataset, key=DUMMY_DEPTH_KEY) == first_info
class TestDepthFromVideoInfo:
"""``DepthEncoderConfig.from_video_info`` reconstructs a depth encoder
config from the ``video.*`` keys persisted in a dataset's ``info.json``.
Depth mirror of :class:`TestFromVideoInfo`.
"""
@require_hevc
def test_reconstructs_from_dummy_depth_video_info(self):
cfg = DepthEncoderConfig.from_video_info(DUMMY_DEPTH_VIDEO_INFO_FULL)
# No alias for ``"hevc"``; the canonical stream codec is reused as-is.
assert cfg.vcodec == "hevc"
assert cfg.pix_fmt == DUMMY_DEPTH_VIDEO_INFO_FULL["video.pix_fmt"]
assert cfg.g == DUMMY_DEPTH_VIDEO_INFO_FULL["video.g"]
assert cfg.crf == DUMMY_DEPTH_VIDEO_INFO_FULL["video.crf"]
assert cfg.fast_decode == DUMMY_DEPTH_VIDEO_INFO_FULL["video.fast_decode"]
assert cfg.video_backend == DUMMY_DEPTH_VIDEO_INFO_FULL["video.video_backend"]
# ``{}`` placeholder (typical after a merge with disagreeing sources)
# must not leak into the reconstructed config.
assert cfg.extra_options == DepthEncoderConfig().extra_options
# Depth-specific tuning round-trips through ``info.json``.
assert cfg.depth_min == DUMMY_DEPTH_VIDEO_INFO_FULL["video.depth_min"]
assert cfg.depth_max == DUMMY_DEPTH_VIDEO_INFO_FULL["video.depth_max"]
assert cfg.shift == DUMMY_DEPTH_VIDEO_INFO_FULL["video.shift"]
assert cfg.use_log == DUMMY_DEPTH_VIDEO_INFO_FULL["video.use_log"]
+45 -1
View File
@@ -39,12 +39,56 @@ DUMMY_VIDEO_INFO = {
"video.crf": 30,
"video.preset": 12,
"video.fast_decode": 0,
"video.is_depth_map": False,
"is_depth_map": False,
"has_audio": False,
}
DUMMY_CAMERA_FEATURES = {
"laptop": {"shape": (64, 96, 3), "names": ["height", "width", "channels"], "info": DUMMY_VIDEO_INFO},
"phone": {"shape": (64, 96, 3), "names": ["height", "width", "channels"], "info": DUMMY_VIDEO_INFO},
}
DUMMY_DEPTH_VIDEO_INFO = {
**DUMMY_VIDEO_INFO,
"is_depth_map": True,
}
DUMMY_DEPTH_VIDEO_INFO_FULL = {
**{k: v for k, v in DUMMY_VIDEO_INFO.items() if k != "video.preset"},
"video.codec": "hevc",
"video.pix_fmt": "gray12le",
"is_depth_map": True,
"video.depth_min": 0.05,
"video.depth_max": 8.0,
"video.shift": 2.5,
"video.use_log": True,
}
DUMMY_DEPTH_CAMERA_FEATURES = {
"laptop_depth": {
"shape": (64, 96, 1),
"names": ["height", "width", "channels"],
"info": DUMMY_DEPTH_VIDEO_INFO,
},
}
DUMMY_CAMERA_FEATURES_WITH_DEPTH = {**DUMMY_CAMERA_FEATURES, **DUMMY_DEPTH_CAMERA_FEATURES}
DUMMY_CHW = (3, 96, 128)
DUMMY_HWC = (96, 128, 3)
# Default video feature set used by video-encoding persistence tests.
DUMMY_VIDEO_FEATURES = {
"observation.images.cam": {
"dtype": "video",
"shape": (64, 96, 3),
"names": ["height", "width", "channels"],
},
"action": {"dtype": "float32", "shape": (2,), "names": ["a", "b"]},
}
DUMMY_VIDEO_KEY = "observation.images.cam"
DUMMY_DEPTH_FEATURES = {
"observation.images.depth": {
"dtype": "video",
"shape": (64, 96, 1),
"names": ["height", "width", "channels"],
"info": {"is_depth_map": True},
},
"action": {"dtype": "float32", "shape": (2,), "names": ["a", "b"]},
}
DUMMY_DEPTH_KEY = "observation.images.depth"
+38
View File
@@ -49,6 +49,39 @@ from tests.fixtures.constants import (
)
def add_frames(dataset: LeRobotDataset, num_frames: int) -> None:
"""Append ``num_frames`` synthetic frames to ``dataset``.
Generates per-feature payloads from ``dataset.meta``: uint16 depth ramps for
keys in ``dataset.meta.depth_keys``, uint8 random noise for video/image keys,
and float32 zeros for everything else. ``DEFAULT_FEATURES`` (timestamp,
frame_index, ...) are auto-populated by ``add_frame`` and skipped here.
"""
video_keys = dataset.meta.video_keys
depth_keys = dataset.meta.depth_keys
# Smooth gradient base reused per (H, W) to keep depth frames cheap to
# encode (HEVC Main 12 hates white noise).
_depth_base_cache: dict[tuple[int, int], np.ndarray] = {}
for i in range(num_frames):
frame: dict = {"task": "test"}
for key, ft in dataset.meta.features.items():
if key in DEFAULT_FEATURES:
continue
shape = ft["shape"]
if key in depth_keys:
h, w, _ = shape
base = _depth_base_cache.setdefault(
(h, w),
np.linspace(100.0, 10_000.0, h * w, dtype=np.float32).reshape(h, w, 1),
)
frame[key] = (base + 50.0 * i).clip(0, 65535).astype(np.uint16)
elif key in video_keys:
frame[key] = np.random.randint(0, 256, shape, dtype=np.uint8)
else:
frame[key] = np.zeros(shape, dtype=np.float32)
dataset.add_frame(frame)
class LeRobotDatasetFactory(Protocol):
def __call__(self, *args, **kwargs) -> LeRobotDataset: ...
@@ -485,10 +518,14 @@ def lerobot_dataset_factory(
hf_dataset: datasets.Dataset | None = None,
data_files_size_in_mb: float = DEFAULT_DATA_FILE_SIZE_IN_MB,
chunks_size: int = DEFAULT_CHUNK_SIZE,
camera_features: dict | None = None,
**kwargs,
) -> LeRobotDataset:
# Instantiate objects
if info is None:
info_kwargs = {}
if camera_features is not None:
info_kwargs["camera_features"] = camera_features
info = info_factory(
total_episodes=total_episodes,
total_frames=total_frames,
@@ -496,6 +533,7 @@ def lerobot_dataset_factory(
use_videos=use_videos,
data_files_size_in_mb=data_files_size_in_mb,
chunks_size=chunks_size,
**info_kwargs,
)
if stats is None:
stats = stats_factory(features=info.features)