Files
lerobot/tests/datasets/test_image_writer.py
Caroline Pascal 3dd19d043e feat(depth maps): adding support for depth in LeRobot (#3644)
* feat(depth): add depth quantization helpers and tests

* feat(video): add ffv1 to supported codecs

* feat(depth): persist depth metadata

* feat(depth): extend quantization tools to better fit the encoding/decoding pipeline

* feat(depth): plumb DepthEncoderConfig through LeRobotDataset and DatasetWriter

* feat(depth): wire StreamingVideoEncoder + writer to depth encoder

* feat(depth): wire DatasetReader to decode_depth_frames

* feat(cameras/realsense): expose async depth in metric meters

* feat(features): route 2D camera shapes to observation.depth.<key>

* feat(robots/so_follower): emit + populate depth keys when use_depth

* feat(record): plumb DepthEncoderConfig through lerobot-record

* feat(viz): render depth observations as rr.DepthImage in Viridis

* feat(depth maps writer): adding support for raw depth maps recording with image writer

* chore(format): format code

* feat(depth shape): ensuring depth maps shape is always including the channel

* feat(is_depth): simplifying is_depth nested name + legacy support

* fix(stop_event): fixing stop_event race condition in camera classes

* fix(plumbing): fixing missing parts in the depth maps pipeline

* chore(typos): fixing typos

* test(fix): fixing exisiting tests to still work with latest features

* tests(depth): adding new tests for depth integration validation

* feat(pix_fmt channels): use PyAv to check get pixel formats number of channels

* feat(refactor): refactor DepthEncoderConfig quantization pipeline, so that the methods do not live in the config class. Add pixel format - channels validation.Move the default pixel format for depth in the config file.

* fix(pre-commit): fixing mutable defautl value

* fix(info): fixing info metadata update when is_depth_map was set

* tests(typos): fixing typos in tests

* fix(realsense): fixing typo in realsense serial number

* fix(normalization): restricting 255 normalization to non depth/uint8 images only

* fix(typo): fixing typo

* fix(TIFF): add missing quantization and cleanup for TIFF files

* feat(batched dequantization): optimizing dequantize_depth for torch based batched dequantization

* feat(tools): adding depth support in LeRobotDataset edition tools

* test(aggregate): extending aggregation tests to depth frames

* test(cleaning): cleaning up tests

* fix(from_video_info): fixing early validation issue in from_video_info

* fix(typo): fixing typo

* fix(is_depth): adding missing doctrings and is_depth arguments in video decoding functions

Co-authored-by: Wensi (Vince) Ai <59036629+wensi-ai@users.noreply.github.com>

* fix(depth units): fixing depth units output for the realsense cameras

* feat(output unit): adding support for output unit specification at dataset reading/training time

Co-authored-by: Wensi (Vince) Ai <59036629+wensi-ai@users.noreply.github.com>

* test(depth): cleaning up depth tests

* test(depth encoding): updating and cleaning video/depth encoding tests

* chore(format): formatting code

* docs(depth): improving depth maps docs

* test(fix): fixing depth tests

* test(dataset tools): adding missing tests for new dataset edition tools features

* chore(format): formatting code

* fix(pyav check): fixing PyAV option validation for integer codec options by normalizing
numeric values before calling `is_integer()`

Co-authored-by: Wensi (Vince) Ai <59036629+wensi-ai@users.noreply.github.com>

* docs(mermaid): fixing mermaid diagram

* fix(rebase): rebase follow up corrections

* feat(dataset tools): adding missing docstrings and features for depth fill support in dataset edition tools

* docs(docstring): updating docstrings

* docs(dataset tools): updating docs

* fix(save images): fixing image saving in dataset tools

* fix(update video info): fixing update video info logic to match the recording and editing use cases

* test(reencode): fixing reencoding monkeypatch

* fix(review): add Claude review

* chore(format): format code

* fix(update video info): ditching the differentiated approahces for video info update - video info are always updated unless for preserved keys.

* chore(rebase): fixing rebase merge conflicts

* test(visualization): fixing visualization tests

* feat(docstrings): adding explicit docstring for encoding parameters. Docstrigns will now show up as description in the CLI --help.

* feat(mm as default): adding a global DEFAULT_DEPTH_UNIT variable setting mm as default depth unit

* fix(RGB <-> camera): renaming camera_encoder to rgb_encoder for clarity

* chore(TODO): removing deprecated TODO

* doc(write_u16_plane): improving docstrings for write_u16_plane

* feat(units): adding constants for depth frames units (m and mm)

* fix(spam): replacing spamming warning but a debug log

* feat(leagcy metadata): adding automatic metadata update for legacy 'video.is_depth_map' feature

* fix(copy&reindex): fixing metadat reshaping for single channel frames

* fix(ImageNet): excluding dpeth frames from ImageNet stats

* fix(PyAV container seek): fixing initial  PyAV container seek to be robust againsy codec choice

* feat(lerobot-dataset-viz): adding support for depth in lerobot-dataset-viz

* fix(compress): removing rerun compression for DepthImages

* fix(signle channel squeeze): fixing single channel squeezing

* chore(format): format code

* fix(streaming): adding support for dequantization in streaming_dataset.py

* refactor(read depth): factorizing depth reading methods for realsense camera and adding support for depth-only usage

* chore(renaming): fixing missed RGBEncoderConfig renamings

* docs(renaming): reflecting renamings in a clearer way in the docs

* chore(annotation): excluding depth from the annotation pipeline

* feat(robots): adding depth support in compatible follower robots

* feat(LeSadKiwi): excluding LeKiwi from depth support (for now)

* chore(fail): removing misplaced file

* chore(fail): removing misplaced file

* fix(remove ffv1): removing ffv1 as it does not support MP4

* docs(cheat sheet): adding depth and video encoding to the cheat sheet

* fix(lossless): tuning depth encoding parameters for lossless depth storage

* test(fix): fixing failing tests

* depth(ZMQ): excluding ZMQ from depth support

* Revert "depth(ZMQ): excluding ZMQ from depth support"

This reverts commit b95cf4e4c2.

* fix(image transforms): excluding depth frames from images transforms

* fix(typo): typo

* fix(stats): fixing stats computation for depth frames

* fix(TIFF vs. pytorch): adding an extra uint16 to float32 conversion for depth maps stored as raw TIFF images

* fix(typos): fixing typos

* test(dtype): fixing stats computation typing tests

---------

Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>
Co-authored-by: Wensi (Vince) Ai <59036629+wensi-ai@users.noreply.github.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
Co-authored-by: Wensi Ai <wsai@stanford.edu>
2026-06-27 14:21:21 +02:00

393 lines
13 KiB
Python

# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import queue
import time
from multiprocessing import queues
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
from PIL import Image
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
from lerobot.datasets.image_writer import (
AsyncImageWriter,
image_array_to_pil_image,
safe_stop_image_writer,
write_image,
)
from tests.fixtures.constants import DUMMY_HWC
DUMMY_IMAGE = "test_image.png"
def test_init_threading():
writer = AsyncImageWriter(num_processes=0, num_threads=2)
try:
assert writer.num_processes == 0
assert writer.num_threads == 2
assert isinstance(writer.queue, queue.Queue)
assert len(writer.threads) == 2
assert len(writer.processes) == 0
assert all(t.is_alive() for t in writer.threads)
finally:
writer.stop()
def test_init_multiprocessing():
writer = AsyncImageWriter(num_processes=2, num_threads=2)
try:
assert writer.num_processes == 2
assert writer.num_threads == 2
assert isinstance(writer.queue, queues.JoinableQueue)
assert len(writer.threads) == 0
assert len(writer.processes) == 2
assert all(p.is_alive() for p in writer.processes)
finally:
writer.stop()
def test_zero_threads():
with pytest.raises(ValueError):
AsyncImageWriter(num_processes=0, num_threads=0)
def test_image_array_to_pil_image_float_array_wrong_range_0_255():
image = np.random.rand(*DUMMY_HWC) * 255
with pytest.raises(ValueError):
image_array_to_pil_image(image)
def test_image_array_to_pil_image_float_array_wrong_range_neg_1_1():
image = np.random.rand(*DUMMY_HWC) * 2 - 1
with pytest.raises(ValueError):
image_array_to_pil_image(image)
def test_image_array_to_pil_image_rgb(img_array_factory):
img_array = img_array_factory(100, 100)
result_image = image_array_to_pil_image(img_array)
assert isinstance(result_image, Image.Image)
assert result_image.size == (100, 100)
assert result_image.mode == "RGB"
def test_image_array_to_pil_image_pytorch_format(img_array_factory):
img_array = img_array_factory(100, 100).transpose(2, 0, 1)
result_image = image_array_to_pil_image(img_array)
assert isinstance(result_image, Image.Image)
assert result_image.size == (100, 100)
assert result_image.mode == "RGB"
def test_image_array_to_pil_image_single_channel(img_array_factory):
img_array = img_array_factory(channels=1)
with pytest.raises(ValueError, match="Unsupported single-channel image dtype"):
image_array_to_pil_image(img_array)
def test_image_array_to_pil_image_4_channels(img_array_factory):
img_array = img_array_factory(channels=4)
with pytest.raises(NotImplementedError):
image_array_to_pil_image(img_array)
def test_image_array_to_pil_image_float_array(img_array_factory):
img_array = img_array_factory(dtype=np.float32)
result_image = image_array_to_pil_image(img_array)
assert isinstance(result_image, Image.Image)
assert result_image.size == (100, 100)
assert result_image.mode == "RGB"
assert np.array(result_image).dtype == np.uint8
def test_image_array_to_pil_image_uint8_array(img_array_factory):
img_array = img_array_factory(dtype=np.float32)
result_image = image_array_to_pil_image(img_array)
assert isinstance(result_image, Image.Image)
assert result_image.size == (100, 100)
assert result_image.mode == "RGB"
assert np.array(result_image).dtype == np.uint8
def test_write_image_numpy(tmp_path, img_array_factory):
image_array = img_array_factory()
fpath = tmp_path / DUMMY_IMAGE
write_image(image_array, fpath)
assert fpath.exists()
saved_image = np.array(Image.open(fpath))
assert np.array_equal(image_array, saved_image)
def test_write_image_image(tmp_path, img_factory):
image_pil = img_factory()
fpath = tmp_path / DUMMY_IMAGE
write_image(image_pil, fpath)
assert fpath.exists()
saved_image = Image.open(fpath)
assert list(saved_image.getdata()) == list(image_pil.getdata())
assert np.array_equal(image_pil, saved_image)
def test_write_image_exception(tmp_path):
image_array = "invalid data"
fpath = tmp_path / DUMMY_IMAGE
with patch("lerobot.datasets.image_writer.logger") as mock_logger:
write_image(image_array, fpath)
mock_logger.error.assert_called()
assert not fpath.exists()
def test_save_image_numpy(tmp_path, img_array_factory):
writer = AsyncImageWriter()
try:
image_array = img_array_factory()
fpath = tmp_path / DUMMY_IMAGE
fpath.parent.mkdir(parents=True, exist_ok=True)
writer.save_image(image_array, fpath)
writer.wait_until_done()
assert fpath.exists()
saved_image = np.array(Image.open(fpath))
assert np.array_equal(image_array, saved_image)
finally:
writer.stop()
def test_save_image_numpy_multiprocessing(tmp_path, img_array_factory):
writer = AsyncImageWriter(num_processes=2, num_threads=2)
try:
image_array = img_array_factory()
fpath = tmp_path / DUMMY_IMAGE
writer.save_image(image_array, fpath)
writer.wait_until_done()
assert fpath.exists()
saved_image = np.array(Image.open(fpath))
assert np.array_equal(image_array, saved_image)
finally:
writer.stop()
def test_save_image_torch(tmp_path, img_tensor_factory):
writer = AsyncImageWriter()
try:
image_tensor = img_tensor_factory()
fpath = tmp_path / DUMMY_IMAGE
fpath.parent.mkdir(parents=True, exist_ok=True)
writer.save_image(image_tensor, fpath)
writer.wait_until_done()
assert fpath.exists()
saved_image = np.array(Image.open(fpath))
expected_image = (image_tensor.permute(1, 2, 0).cpu().numpy() * 255).astype(np.uint8)
assert np.array_equal(expected_image, saved_image)
finally:
writer.stop()
def test_save_image_torch_multiprocessing(tmp_path, img_tensor_factory):
writer = AsyncImageWriter(num_processes=2, num_threads=2)
try:
image_tensor = img_tensor_factory()
fpath = tmp_path / DUMMY_IMAGE
writer.save_image(image_tensor, fpath)
writer.wait_until_done()
assert fpath.exists()
saved_image = np.array(Image.open(fpath))
expected_image = (image_tensor.permute(1, 2, 0).cpu().numpy() * 255).astype(np.uint8)
assert np.array_equal(expected_image, saved_image)
finally:
writer.stop()
def test_save_image_pil(tmp_path, img_factory):
writer = AsyncImageWriter()
try:
image_pil = img_factory()
fpath = tmp_path / DUMMY_IMAGE
fpath.parent.mkdir(parents=True, exist_ok=True)
writer.save_image(image_pil, fpath)
writer.wait_until_done()
assert fpath.exists()
saved_image = Image.open(fpath)
assert list(saved_image.getdata()) == list(image_pil.getdata())
finally:
writer.stop()
def test_save_image_pil_multiprocessing(tmp_path, img_factory):
writer = AsyncImageWriter(num_processes=2, num_threads=2)
try:
image_pil = img_factory()
fpath = tmp_path / DUMMY_IMAGE
writer.save_image(image_pil, fpath)
writer.wait_until_done()
assert fpath.exists()
saved_image = Image.open(fpath)
assert list(saved_image.getdata()) == list(image_pil.getdata())
finally:
writer.stop()
def test_save_image_invalid_data(tmp_path):
writer = AsyncImageWriter()
try:
image_array = "invalid data"
fpath = tmp_path / DUMMY_IMAGE
fpath.parent.mkdir(parents=True, exist_ok=True)
with patch("lerobot.datasets.image_writer.logger") as mock_logger:
writer.save_image(image_array, fpath)
writer.wait_until_done()
mock_logger.error.assert_called()
assert not fpath.exists()
finally:
writer.stop()
def test_save_image_after_stop(tmp_path, img_array_factory):
writer = AsyncImageWriter()
writer.stop()
image_array = img_array_factory()
fpath = tmp_path / DUMMY_IMAGE
writer.save_image(image_array, fpath)
time.sleep(1)
assert not fpath.exists()
def test_stop():
writer = AsyncImageWriter(num_processes=0, num_threads=2)
writer.stop()
assert not any(t.is_alive() for t in writer.threads)
def test_stop_multiprocessing():
writer = AsyncImageWriter(num_processes=2, num_threads=2)
writer.stop()
assert not any(p.is_alive() for p in writer.processes)
def test_multiple_stops():
writer = AsyncImageWriter()
writer.stop()
writer.stop() # Should not raise an exception
assert not any(t.is_alive() for t in writer.threads)
def test_multiple_stops_multiprocessing():
writer = AsyncImageWriter(num_processes=2, num_threads=2)
writer.stop()
writer.stop() # Should not raise an exception
assert not any(t.is_alive() for t in writer.threads)
def test_wait_until_done(tmp_path, img_array_factory):
writer = AsyncImageWriter(num_processes=0, num_threads=4)
try:
num_images = 100
image_arrays = [img_array_factory(height=500, width=500) for _ in range(num_images)]
fpaths = [tmp_path / f"frame_{i:06d}.png" for i in range(num_images)]
for image_array, fpath in zip(image_arrays, fpaths, strict=True):
fpath.parent.mkdir(parents=True, exist_ok=True)
writer.save_image(image_array, fpath)
writer.wait_until_done()
for i, fpath in enumerate(fpaths):
assert fpath.exists()
saved_image = np.array(Image.open(fpath))
assert np.array_equal(saved_image, image_arrays[i])
finally:
writer.stop()
def test_wait_until_done_multiprocessing(tmp_path, img_array_factory):
writer = AsyncImageWriter(num_processes=2, num_threads=2)
try:
num_images = 100
image_arrays = [img_array_factory() for _ in range(num_images)]
fpaths = [tmp_path / f"frame_{i:06d}.png" for i in range(num_images)]
for image_array, fpath in zip(image_arrays, fpaths, strict=True):
fpath.parent.mkdir(parents=True, exist_ok=True)
writer.save_image(image_array, fpath)
writer.wait_until_done()
for i, fpath in enumerate(fpaths):
assert fpath.exists()
saved_image = np.array(Image.open(fpath))
assert np.array_equal(saved_image, image_arrays[i])
finally:
writer.stop()
def test_exception_handling(tmp_path, img_array_factory):
writer = AsyncImageWriter()
try:
image_array = img_array_factory()
with (
patch.object(writer.queue, "put", side_effect=queue.Full("Queue is full")),
pytest.raises(queue.Full) as exc_info,
):
writer.save_image(image_array, tmp_path / "test.png")
assert str(exc_info.value) == "Queue is full"
finally:
writer.stop()
def test_with_different_image_formats(tmp_path, img_array_factory):
writer = AsyncImageWriter()
try:
image_array = img_array_factory()
formats = ["png", "tiff", "tif"]
for fmt in formats:
fpath = tmp_path / f"test_image.{fmt}"
write_image(image_array, fpath)
assert fpath.exists()
finally:
writer.stop()
def test_safe_stop_image_writer_decorator():
class MockWriter:
def __init__(self):
self.image_writer = MagicMock(spec=AsyncImageWriter)
class MockDataset:
def __init__(self):
self.writer = MockWriter()
@safe_stop_image_writer
def function_that_raises_exception(dataset=None):
raise Exception("Test exception")
dataset = MockDataset()
with pytest.raises(Exception) as exc_info:
function_that_raises_exception(dataset=dataset)
assert str(exc_info.value) == "Test exception"
dataset.writer.image_writer.stop.assert_called_once()
def test_main_process_time(tmp_path, img_tensor_factory):
writer = AsyncImageWriter()
try:
image_tensor = img_tensor_factory()
fpath = tmp_path / DUMMY_IMAGE
start_time = time.perf_counter()
writer.save_image(image_tensor, fpath)
end_time = time.perf_counter()
time_spent = end_time - start_time
# Might need to adjust this threshold depending on hardware
assert time_spent < 0.01, f"Main process time exceeded threshold: {time_spent}s"
writer.wait_until_done()
assert fpath.exists()
finally:
writer.stop()