mirror of
https://github.com/huggingface/lerobot.git
synced 2026-06-28 21:57:27 +00:00
3dd19d043e
* feat(depth): add depth quantization helpers and tests
* feat(video): add ffv1 to supported codecs
* feat(depth): persist depth metadata
* feat(depth): extend quantization tools to better fit the encoding/decoding pipeline
* feat(depth): plumb DepthEncoderConfig through LeRobotDataset and DatasetWriter
* feat(depth): wire StreamingVideoEncoder + writer to depth encoder
* feat(depth): wire DatasetReader to decode_depth_frames
* feat(cameras/realsense): expose async depth in metric meters
* feat(features): route 2D camera shapes to observation.depth.<key>
* feat(robots/so_follower): emit + populate depth keys when use_depth
* feat(record): plumb DepthEncoderConfig through lerobot-record
* feat(viz): render depth observations as rr.DepthImage in Viridis
* feat(depth maps writer): adding support for raw depth maps recording with image writer
* chore(format): format code
* feat(depth shape): ensuring depth maps shape is always including the channel
* feat(is_depth): simplifying is_depth nested name + legacy support
* fix(stop_event): fixing stop_event race condition in camera classes
* fix(plumbing): fixing missing parts in the depth maps pipeline
* chore(typos): fixing typos
* test(fix): fixing exisiting tests to still work with latest features
* tests(depth): adding new tests for depth integration validation
* feat(pix_fmt channels): use PyAv to check get pixel formats number of channels
* feat(refactor): refactor DepthEncoderConfig quantization pipeline, so that the methods do not live in the config class. Add pixel format - channels validation.Move the default pixel format for depth in the config file.
* fix(pre-commit): fixing mutable defautl value
* fix(info): fixing info metadata update when is_depth_map was set
* tests(typos): fixing typos in tests
* fix(realsense): fixing typo in realsense serial number
* fix(normalization): restricting 255 normalization to non depth/uint8 images only
* fix(typo): fixing typo
* fix(TIFF): add missing quantization and cleanup for TIFF files
* feat(batched dequantization): optimizing dequantize_depth for torch based batched dequantization
* feat(tools): adding depth support in LeRobotDataset edition tools
* test(aggregate): extending aggregation tests to depth frames
* test(cleaning): cleaning up tests
* fix(from_video_info): fixing early validation issue in from_video_info
* fix(typo): fixing typo
* fix(is_depth): adding missing doctrings and is_depth arguments in video decoding functions
Co-authored-by: Wensi (Vince) Ai <59036629+wensi-ai@users.noreply.github.com>
* fix(depth units): fixing depth units output for the realsense cameras
* feat(output unit): adding support for output unit specification at dataset reading/training time
Co-authored-by: Wensi (Vince) Ai <59036629+wensi-ai@users.noreply.github.com>
* test(depth): cleaning up depth tests
* test(depth encoding): updating and cleaning video/depth encoding tests
* chore(format): formatting code
* docs(depth): improving depth maps docs
* test(fix): fixing depth tests
* test(dataset tools): adding missing tests for new dataset edition tools features
* chore(format): formatting code
* fix(pyav check): fixing PyAV option validation for integer codec options by normalizing
numeric values before calling `is_integer()`
Co-authored-by: Wensi (Vince) Ai <59036629+wensi-ai@users.noreply.github.com>
* docs(mermaid): fixing mermaid diagram
* fix(rebase): rebase follow up corrections
* feat(dataset tools): adding missing docstrings and features for depth fill support in dataset edition tools
* docs(docstring): updating docstrings
* docs(dataset tools): updating docs
* fix(save images): fixing image saving in dataset tools
* fix(update video info): fixing update video info logic to match the recording and editing use cases
* test(reencode): fixing reencoding monkeypatch
* fix(review): add Claude review
* chore(format): format code
* fix(update video info): ditching the differentiated approahces for video info update - video info are always updated unless for preserved keys.
* chore(rebase): fixing rebase merge conflicts
* test(visualization): fixing visualization tests
* feat(docstrings): adding explicit docstring for encoding parameters. Docstrigns will now show up as description in the CLI --help.
* feat(mm as default): adding a global DEFAULT_DEPTH_UNIT variable setting mm as default depth unit
* fix(RGB <-> camera): renaming camera_encoder to rgb_encoder for clarity
* chore(TODO): removing deprecated TODO
* doc(write_u16_plane): improving docstrings for write_u16_plane
* feat(units): adding constants for depth frames units (m and mm)
* fix(spam): replacing spamming warning but a debug log
* feat(leagcy metadata): adding automatic metadata update for legacy 'video.is_depth_map' feature
* fix(copy&reindex): fixing metadat reshaping for single channel frames
* fix(ImageNet): excluding dpeth frames from ImageNet stats
* fix(PyAV container seek): fixing initial PyAV container seek to be robust againsy codec choice
* feat(lerobot-dataset-viz): adding support for depth in lerobot-dataset-viz
* fix(compress): removing rerun compression for DepthImages
* fix(signle channel squeeze): fixing single channel squeezing
* chore(format): format code
* fix(streaming): adding support for dequantization in streaming_dataset.py
* refactor(read depth): factorizing depth reading methods for realsense camera and adding support for depth-only usage
* chore(renaming): fixing missed RGBEncoderConfig renamings
* docs(renaming): reflecting renamings in a clearer way in the docs
* chore(annotation): excluding depth from the annotation pipeline
* feat(robots): adding depth support in compatible follower robots
* feat(LeSadKiwi): excluding LeKiwi from depth support (for now)
* chore(fail): removing misplaced file
* chore(fail): removing misplaced file
* fix(remove ffv1): removing ffv1 as it does not support MP4
* docs(cheat sheet): adding depth and video encoding to the cheat sheet
* fix(lossless): tuning depth encoding parameters for lossless depth storage
* test(fix): fixing failing tests
* depth(ZMQ): excluding ZMQ from depth support
* Revert "depth(ZMQ): excluding ZMQ from depth support"
This reverts commit b95cf4e4c2.
* fix(image transforms): excluding depth frames from images transforms
* fix(typo): typo
* fix(stats): fixing stats computation for depth frames
* fix(TIFF vs. pytorch): adding an extra uint16 to float32 conversion for depth maps stored as raw TIFF images
* fix(typos): fixing typos
* test(dtype): fixing stats computation typing tests
---------
Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>
Co-authored-by: Wensi (Vince) Ai <59036629+wensi-ai@users.noreply.github.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
Co-authored-by: Wensi Ai <wsai@stanford.edu>
668 lines
24 KiB
Python
668 lines
24 KiB
Python
#!/usr/bin/env python
|
|
|
|
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Tests for streaming video encoding."""
|
|
|
|
import queue
|
|
import threading
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
pytest.importorskip("av", reason="av is required (install lerobot[dataset])")
|
|
|
|
import av # noqa: E402
|
|
|
|
from lerobot.configs import RGBEncoderConfig
|
|
from lerobot.datasets.pyav_utils import get_codec
|
|
from lerobot.datasets.video_utils import (
|
|
StreamingVideoEncoder,
|
|
_CameraEncoderThread,
|
|
)
|
|
from lerobot.utils.constants import OBS_IMAGES
|
|
|
|
# Cross-codec validation tests only fire when the target codec is present
|
|
# in the local FFmpeg build; on other platforms validate() is a no-op.
|
|
_has_videotoolbox = get_codec("h264_videotoolbox") is not None
|
|
_videotoolbox_only = pytest.mark.skipif(
|
|
not _has_videotoolbox, reason="h264_videotoolbox not in local FFmpeg build"
|
|
)
|
|
|
|
|
|
# ─── _CameraEncoderThread tests ───
|
|
|
|
|
|
class TestCameraEncoderThread:
|
|
def test_encodes_valid_mp4(self, tmp_path):
|
|
"""Test that the encoder thread creates a valid MP4 file with correct frame count."""
|
|
num_frames = 30
|
|
height, width = 64, 96
|
|
fps = 30
|
|
video_path = tmp_path / "test_output" / "test.mp4"
|
|
|
|
frame_queue: queue.Queue = queue.Queue(maxsize=60)
|
|
result_queue: queue.Queue = queue.Queue(maxsize=1)
|
|
stop_event = threading.Event()
|
|
|
|
enc_cfg = RGBEncoderConfig(vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30, preset=13)
|
|
encoder_thread = _CameraEncoderThread(
|
|
video_path=video_path,
|
|
fps=fps,
|
|
video_encoder=enc_cfg,
|
|
frame_queue=frame_queue,
|
|
result_queue=result_queue,
|
|
stop_event=stop_event,
|
|
)
|
|
encoder_thread.start()
|
|
|
|
# Feed frames (HWC uint8)
|
|
for _ in range(num_frames):
|
|
frame = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)
|
|
frame_queue.put(frame)
|
|
|
|
# Send sentinel
|
|
frame_queue.put(None)
|
|
encoder_thread.join(timeout=60)
|
|
assert not encoder_thread.is_alive()
|
|
|
|
# Check result
|
|
status, data = result_queue.get(timeout=5)
|
|
assert status == "ok"
|
|
assert data is not None # Stats should be returned
|
|
assert "mean" in data
|
|
assert "std" in data
|
|
assert "min" in data
|
|
assert "max" in data
|
|
assert "count" in data
|
|
|
|
# Verify the MP4 file is valid
|
|
assert video_path.exists()
|
|
with av.open(str(video_path)) as container:
|
|
stream = container.streams.video[0]
|
|
# The frame count should match
|
|
total_frames = sum(1 for _ in container.decode(stream))
|
|
assert total_frames == num_frames
|
|
|
|
def test_handles_chw_input(self, tmp_path):
|
|
"""Test that CHW format input is handled correctly."""
|
|
num_frames = 5
|
|
fps = 30
|
|
video_path = tmp_path / "test_chw" / "test.mp4"
|
|
|
|
frame_queue: queue.Queue = queue.Queue(maxsize=60)
|
|
result_queue: queue.Queue = queue.Queue(maxsize=1)
|
|
stop_event = threading.Event()
|
|
|
|
enc_cfg = RGBEncoderConfig(vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30, preset=13)
|
|
encoder_thread = _CameraEncoderThread(
|
|
video_path=video_path,
|
|
fps=fps,
|
|
video_encoder=enc_cfg,
|
|
frame_queue=frame_queue,
|
|
result_queue=result_queue,
|
|
stop_event=stop_event,
|
|
)
|
|
encoder_thread.start()
|
|
|
|
# Feed CHW frames
|
|
for _ in range(num_frames):
|
|
frame = np.random.randint(0, 255, (3, 64, 96), dtype=np.uint8)
|
|
frame_queue.put(frame)
|
|
|
|
frame_queue.put(None)
|
|
encoder_thread.join(timeout=60)
|
|
|
|
status, _ = result_queue.get(timeout=5)
|
|
assert status == "ok"
|
|
assert video_path.exists()
|
|
|
|
def test_stop_event_cancellation(self, tmp_path):
|
|
"""Test that setting the stop event causes the thread to exit."""
|
|
fps = 30
|
|
video_path = tmp_path / "test_cancel" / "test.mp4"
|
|
|
|
frame_queue: queue.Queue = queue.Queue(maxsize=60)
|
|
result_queue: queue.Queue = queue.Queue(maxsize=1)
|
|
stop_event = threading.Event()
|
|
|
|
enc_cfg = RGBEncoderConfig(vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30, preset=13)
|
|
encoder_thread = _CameraEncoderThread(
|
|
video_path=video_path,
|
|
fps=fps,
|
|
video_encoder=enc_cfg,
|
|
frame_queue=frame_queue,
|
|
result_queue=result_queue,
|
|
stop_event=stop_event,
|
|
)
|
|
encoder_thread.start()
|
|
|
|
# Feed a few frames
|
|
for _ in range(3):
|
|
frame = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8)
|
|
frame_queue.put(frame)
|
|
|
|
# Signal stop instead of sending sentinel
|
|
stop_event.set()
|
|
encoder_thread.join(timeout=10)
|
|
assert not encoder_thread.is_alive()
|
|
|
|
|
|
# ─── StreamingVideoEncoder tests ───
|
|
|
|
|
|
class TestStreamingVideoEncoder:
|
|
def _make_encoder_config(self, **kwargs):
|
|
"""Helper to build an RGBEncoderConfig."""
|
|
return RGBEncoderConfig(**kwargs)
|
|
|
|
def test_single_camera_episode(self, tmp_path):
|
|
"""Test encoding a single camera episode."""
|
|
video_keys = [f"{OBS_IMAGES}.laptop"]
|
|
encoder = StreamingVideoEncoder(
|
|
fps=30,
|
|
rgb_encoder=self._make_encoder_config(
|
|
vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30, preset=13
|
|
),
|
|
)
|
|
|
|
encoder.start_episode(video_keys, tmp_path)
|
|
|
|
num_frames = 20
|
|
for _ in range(num_frames):
|
|
frame = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8)
|
|
encoder.feed_frame(f"{OBS_IMAGES}.laptop", frame)
|
|
|
|
results = encoder.finish_episode()
|
|
assert f"{OBS_IMAGES}.laptop" in results
|
|
|
|
mp4_path, stats = results[f"{OBS_IMAGES}.laptop"]
|
|
assert mp4_path.exists()
|
|
assert stats is not None
|
|
|
|
# Verify frame count
|
|
with av.open(str(mp4_path)) as container:
|
|
stream = container.streams.video[0]
|
|
total_frames = sum(1 for _ in container.decode(stream))
|
|
assert total_frames == num_frames
|
|
|
|
encoder.close()
|
|
|
|
def test_multi_camera_episode(self, tmp_path):
|
|
"""Test encoding multiple cameras simultaneously."""
|
|
video_keys = [f"{OBS_IMAGES}.laptop", f"{OBS_IMAGES}.phone"]
|
|
encoder = StreamingVideoEncoder(
|
|
fps=30,
|
|
rgb_encoder=self._make_encoder_config(vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30),
|
|
)
|
|
encoder.start_episode(video_keys, tmp_path)
|
|
|
|
num_frames = 15
|
|
for _ in range(num_frames):
|
|
frame0 = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8)
|
|
frame1 = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8)
|
|
encoder.feed_frame(video_keys[0], frame0)
|
|
encoder.feed_frame(video_keys[1], frame1)
|
|
|
|
results = encoder.finish_episode()
|
|
|
|
for key in video_keys:
|
|
assert key in results
|
|
mp4_path, stats = results[key]
|
|
assert mp4_path.exists()
|
|
assert stats is not None
|
|
|
|
encoder.close()
|
|
|
|
def test_sequential_episodes(self, tmp_path):
|
|
"""Test that multiple sequential episodes work correctly."""
|
|
video_keys = [f"{OBS_IMAGES}.cam"]
|
|
encoder = StreamingVideoEncoder(
|
|
fps=30,
|
|
rgb_encoder=self._make_encoder_config(vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30),
|
|
)
|
|
|
|
for ep in range(3):
|
|
encoder.start_episode(video_keys, tmp_path)
|
|
num_frames = 10 + ep * 5
|
|
for _ in range(num_frames):
|
|
frame = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8)
|
|
encoder.feed_frame(f"{OBS_IMAGES}.cam", frame)
|
|
results = encoder.finish_episode()
|
|
|
|
mp4_path, stats = results[f"{OBS_IMAGES}.cam"]
|
|
assert mp4_path.exists()
|
|
|
|
with av.open(str(mp4_path)) as container:
|
|
stream = container.streams.video[0]
|
|
total_frames = sum(1 for _ in container.decode(stream))
|
|
assert total_frames == num_frames
|
|
|
|
encoder.close()
|
|
|
|
def test_cancel_episode(self, tmp_path):
|
|
"""Test that canceling an episode cleans up properly."""
|
|
video_keys = [f"{OBS_IMAGES}.cam"]
|
|
encoder = StreamingVideoEncoder(
|
|
fps=30,
|
|
rgb_encoder=self._make_encoder_config(vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30),
|
|
)
|
|
|
|
encoder.start_episode(video_keys, tmp_path)
|
|
|
|
for _ in range(5):
|
|
frame = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8)
|
|
encoder.feed_frame(f"{OBS_IMAGES}.cam", frame)
|
|
|
|
encoder.cancel_episode()
|
|
|
|
# Should be able to start a new episode after cancel
|
|
encoder.start_episode(video_keys, tmp_path)
|
|
for _ in range(5):
|
|
frame = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8)
|
|
encoder.feed_frame(f"{OBS_IMAGES}.cam", frame)
|
|
results = encoder.finish_episode()
|
|
|
|
assert f"{OBS_IMAGES}.cam" in results
|
|
encoder.close()
|
|
|
|
def test_feed_without_start_raises(self, tmp_path):
|
|
"""Test that feeding frames without starting an episode raises."""
|
|
encoder = StreamingVideoEncoder(fps=30)
|
|
with pytest.raises(RuntimeError, match="No active episode"):
|
|
encoder.feed_frame("cam", np.zeros((64, 96, 3), dtype=np.uint8))
|
|
encoder.close()
|
|
|
|
def test_finish_without_start_raises(self, tmp_path):
|
|
"""Test that finishing without starting raises."""
|
|
encoder = StreamingVideoEncoder(fps=30)
|
|
with pytest.raises(RuntimeError, match="No active episode"):
|
|
encoder.finish_episode()
|
|
encoder.close()
|
|
|
|
def test_close_is_idempotent(self, tmp_path):
|
|
"""Test that close() can be called multiple times safely."""
|
|
encoder = StreamingVideoEncoder(fps=30)
|
|
encoder.close()
|
|
encoder.close() # Should not raise
|
|
|
|
def test_video_duration_matches_frame_count(self, tmp_path):
|
|
"""Test that encoded video duration matches num_frames / fps."""
|
|
video_keys = [f"{OBS_IMAGES}.cam"]
|
|
encoder = StreamingVideoEncoder(
|
|
fps=30,
|
|
rgb_encoder=self._make_encoder_config(
|
|
vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30, preset=13
|
|
),
|
|
)
|
|
encoder.start_episode(video_keys, tmp_path)
|
|
|
|
num_frames = 90 # 3 seconds at 30fps
|
|
for _ in range(num_frames):
|
|
frame = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8)
|
|
encoder.feed_frame(f"{OBS_IMAGES}.cam", frame)
|
|
|
|
results = encoder.finish_episode()
|
|
mp4_path, _ = results[f"{OBS_IMAGES}.cam"]
|
|
|
|
expected_duration = num_frames / 30.0 # 3.0 seconds
|
|
|
|
with av.open(str(mp4_path)) as container:
|
|
stream = container.streams.video[0]
|
|
total_frames = sum(1 for _ in container.decode(stream))
|
|
if stream.duration is not None:
|
|
actual_duration = float(stream.duration * stream.time_base)
|
|
else:
|
|
actual_duration = float(container.duration / av.time_base)
|
|
|
|
assert total_frames == num_frames
|
|
# Allow small tolerance for duration due to codec framing
|
|
assert abs(actual_duration - expected_duration) < 0.5, (
|
|
f"Video duration {actual_duration:.2f}s != expected {expected_duration:.2f}s"
|
|
)
|
|
|
|
encoder.close()
|
|
|
|
def test_multi_camera_start_episode_called_once(self, tmp_path):
|
|
"""Test that with multiple cameras, no frames are lost due to double start_episode."""
|
|
video_keys = [f"{OBS_IMAGES}.cam1", f"{OBS_IMAGES}.cam2"]
|
|
encoder = StreamingVideoEncoder(
|
|
fps=30,
|
|
rgb_encoder=self._make_encoder_config(vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30),
|
|
)
|
|
encoder.start_episode(video_keys, tmp_path)
|
|
|
|
num_frames = 30
|
|
for _ in range(num_frames):
|
|
frame0 = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8)
|
|
frame1 = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8)
|
|
encoder.feed_frame(video_keys[0], frame0)
|
|
encoder.feed_frame(video_keys[1], frame1)
|
|
|
|
results = encoder.finish_episode()
|
|
|
|
# Both cameras should have all frames
|
|
for key in video_keys:
|
|
mp4_path, stats = results[key]
|
|
assert mp4_path.exists()
|
|
with av.open(str(mp4_path)) as container:
|
|
stream = container.streams.video[0]
|
|
total_frames = sum(1 for _ in container.decode(stream))
|
|
assert total_frames == num_frames, (
|
|
f"Camera {key}: expected {num_frames} frames, got {total_frames}"
|
|
)
|
|
|
|
encoder.close()
|
|
|
|
def test_encoder_threads_passed_to_thread(self, tmp_path):
|
|
"""Test that encoder_threads is stored and passed through to encoder threads."""
|
|
video_keys = [f"{OBS_IMAGES}.cam"]
|
|
cfg = RGBEncoderConfig(
|
|
vcodec="libsvtav1",
|
|
pix_fmt="yuv420p",
|
|
g=2,
|
|
crf=30,
|
|
)
|
|
encoder = StreamingVideoEncoder(
|
|
fps=30,
|
|
rgb_encoder=cfg,
|
|
encoder_threads=2,
|
|
)
|
|
assert encoder._encoder_threads == 2
|
|
encoder.start_episode(video_keys, tmp_path)
|
|
|
|
# Verify codec options include thread tuning for libsvtav1 (lp=…)
|
|
thread = encoder._threads[f"{OBS_IMAGES}.cam"]
|
|
codec_opts = thread.video_encoder.get_codec_options(encoder_threads=thread.encoder_threads)
|
|
assert "svtav1-params" in codec_opts or "threads" in codec_opts
|
|
|
|
# Feed some frames and finish to ensure it works end-to-end
|
|
num_frames = 10
|
|
for _ in range(num_frames):
|
|
frame = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8)
|
|
encoder.feed_frame(f"{OBS_IMAGES}.cam", frame)
|
|
|
|
results = encoder.finish_episode()
|
|
mp4_path, stats = results[f"{OBS_IMAGES}.cam"]
|
|
assert mp4_path.exists()
|
|
assert stats is not None
|
|
|
|
with av.open(str(mp4_path)) as container:
|
|
stream = container.streams.video[0]
|
|
total_frames = sum(1 for _ in container.decode(stream))
|
|
assert total_frames == num_frames
|
|
|
|
encoder.close()
|
|
|
|
def test_encoder_threads_none_by_default(self, tmp_path):
|
|
"""Test that encoder_threads defaults to None (codec auto-detect)."""
|
|
encoder = StreamingVideoEncoder(fps=30)
|
|
assert encoder._encoder_threads is None
|
|
encoder.close()
|
|
|
|
def test_graceful_frame_dropping(self, tmp_path):
|
|
"""Test that full queue drops frames instead of crashing."""
|
|
video_keys = [f"{OBS_IMAGES}.cam"]
|
|
encoder = StreamingVideoEncoder(
|
|
fps=30,
|
|
rgb_encoder=self._make_encoder_config(
|
|
vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30, preset=13
|
|
),
|
|
queue_maxsize=1,
|
|
)
|
|
encoder.start_episode(video_keys, tmp_path)
|
|
|
|
# Feed many frames quickly - with queue_maxsize=1, some will be dropped
|
|
num_frames = 50
|
|
for _ in range(num_frames):
|
|
frame = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8)
|
|
encoder.feed_frame(f"{OBS_IMAGES}.cam", frame)
|
|
|
|
# Should not raise - frames are dropped gracefully
|
|
results = encoder.finish_episode()
|
|
assert f"{OBS_IMAGES}.cam" in results
|
|
|
|
mp4_path, _ = results[f"{OBS_IMAGES}.cam"]
|
|
assert mp4_path.exists()
|
|
|
|
# Some frames should have been dropped (queue was tiny)
|
|
dropped = encoder._dropped_frames.get(f"{OBS_IMAGES}.cam", 0)
|
|
# We can't guarantee drops but can verify no crash occurred
|
|
assert dropped >= 0
|
|
|
|
encoder.close()
|
|
|
|
|
|
# ─── Integration tests with LeRobotDataset ───
|
|
|
|
|
|
class TestStreamingEncoderIntegration:
|
|
def test_add_frame_save_episode_streaming(self, tmp_path):
|
|
"""Full integration test: add_frame -> save_episode with streaming encoding."""
|
|
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
|
|
|
features = {
|
|
"observation.images.cam": {
|
|
"dtype": "video",
|
|
"shape": (64, 96, 3),
|
|
"names": ["height", "width", "channels"],
|
|
},
|
|
"action": {"dtype": "float32", "shape": (6,), "names": ["j1", "j2", "j3", "j4", "j5", "j6"]},
|
|
}
|
|
|
|
dataset = LeRobotDataset.create(
|
|
repo_id="test/streaming",
|
|
fps=30,
|
|
features=features,
|
|
root=tmp_path / "streaming_test",
|
|
use_videos=True,
|
|
streaming_encoding=True,
|
|
)
|
|
|
|
assert dataset.writer._streaming_encoder is not None
|
|
|
|
num_frames = 20
|
|
for _ in range(num_frames):
|
|
frame = {
|
|
"observation.images.cam": np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8),
|
|
"action": np.random.randn(6).astype(np.float32),
|
|
"task": "test task",
|
|
}
|
|
dataset.add_frame(frame)
|
|
|
|
dataset.save_episode()
|
|
|
|
# Verify dataset metadata
|
|
assert dataset.meta.total_episodes == 1
|
|
assert dataset.meta.total_frames == num_frames
|
|
|
|
# Verify stats exist for the video key
|
|
assert dataset.meta.stats is not None
|
|
assert "observation.images.cam" in dataset.meta.stats
|
|
assert "action" in dataset.meta.stats
|
|
|
|
dataset.finalize()
|
|
|
|
def test_streaming_disabled_creates_pngs(self, tmp_path):
|
|
"""Test that disabling streaming encoding falls back to PNG path."""
|
|
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
|
|
|
features = {
|
|
"observation.images.cam": {
|
|
"dtype": "video",
|
|
"shape": (64, 96, 3),
|
|
"names": ["height", "width", "channels"],
|
|
},
|
|
"action": {"dtype": "float32", "shape": (6,), "names": ["j1", "j2", "j3", "j4", "j5", "j6"]},
|
|
}
|
|
|
|
dataset = LeRobotDataset.create(
|
|
repo_id="test/no_streaming",
|
|
fps=30,
|
|
features=features,
|
|
root=tmp_path / "no_streaming_test",
|
|
use_videos=True,
|
|
streaming_encoding=False,
|
|
)
|
|
|
|
assert dataset.writer._streaming_encoder is None
|
|
|
|
num_frames = 5
|
|
for _ in range(num_frames):
|
|
frame = {
|
|
"observation.images.cam": np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8),
|
|
"action": np.random.randn(6).astype(np.float32),
|
|
"task": "test task",
|
|
}
|
|
dataset.add_frame(frame)
|
|
|
|
# With streaming disabled, PNG files should be written
|
|
images_dir = dataset.root / "images"
|
|
assert images_dir.exists()
|
|
|
|
dataset.save_episode()
|
|
dataset.finalize()
|
|
|
|
def test_multi_episode_streaming(self, tmp_path):
|
|
"""Test recording multiple episodes with streaming encoding."""
|
|
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
|
|
|
features = {
|
|
"observation.images.cam": {
|
|
"dtype": "video",
|
|
"shape": (64, 96, 3),
|
|
"names": ["height", "width", "channels"],
|
|
},
|
|
"action": {"dtype": "float32", "shape": (2,), "names": ["j1", "j2"]},
|
|
}
|
|
|
|
dataset = LeRobotDataset.create(
|
|
repo_id="test/multi_ep",
|
|
fps=30,
|
|
features=features,
|
|
root=tmp_path / "multi_ep_test",
|
|
use_videos=True,
|
|
streaming_encoding=True,
|
|
)
|
|
|
|
for ep in range(3):
|
|
num_frames = 10 + ep * 5
|
|
for _ in range(num_frames):
|
|
frame = {
|
|
"observation.images.cam": np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8),
|
|
"action": np.random.randn(2).astype(np.float32),
|
|
"task": f"task_{ep}",
|
|
}
|
|
dataset.add_frame(frame)
|
|
dataset.save_episode()
|
|
|
|
assert dataset.meta.total_episodes == 3
|
|
assert dataset.meta.total_frames == 10 + 15 + 20
|
|
|
|
dataset.finalize()
|
|
|
|
def test_clear_episode_buffer_cancels_streaming(self, tmp_path):
|
|
"""Test that clearing episode buffer cancels streaming encoding."""
|
|
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
|
|
|
features = {
|
|
"observation.images.cam": {
|
|
"dtype": "video",
|
|
"shape": (64, 96, 3),
|
|
"names": ["height", "width", "channels"],
|
|
},
|
|
"action": {"dtype": "float32", "shape": (2,), "names": ["j1", "j2"]},
|
|
}
|
|
|
|
dataset = LeRobotDataset.create(
|
|
repo_id="test/cancel",
|
|
fps=30,
|
|
features=features,
|
|
root=tmp_path / "cancel_test",
|
|
use_videos=True,
|
|
streaming_encoding=True,
|
|
)
|
|
|
|
# Add some frames
|
|
for _ in range(5):
|
|
frame = {
|
|
"observation.images.cam": np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8),
|
|
"action": np.random.randn(2).astype(np.float32),
|
|
"task": "task",
|
|
}
|
|
dataset.add_frame(frame)
|
|
|
|
# Cancel and re-record
|
|
dataset.clear_episode_buffer()
|
|
|
|
# Record a new episode
|
|
for _ in range(10):
|
|
frame = {
|
|
"observation.images.cam": np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8),
|
|
"action": np.random.randn(2).astype(np.float32),
|
|
"task": "task",
|
|
}
|
|
dataset.add_frame(frame)
|
|
dataset.save_episode()
|
|
|
|
assert dataset.meta.total_episodes == 1
|
|
assert dataset.meta.total_frames == 10
|
|
|
|
dataset.finalize()
|
|
|
|
def test_multi_camera_add_frame_streaming(self, tmp_path):
|
|
"""Test that start_episode is called once with multiple video keys."""
|
|
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
|
|
|
features = {
|
|
"observation.images.cam1": {
|
|
"dtype": "video",
|
|
"shape": (64, 96, 3),
|
|
"names": ["height", "width", "channels"],
|
|
},
|
|
"observation.images.cam2": {
|
|
"dtype": "video",
|
|
"shape": (64, 96, 3),
|
|
"names": ["height", "width", "channels"],
|
|
},
|
|
"action": {"dtype": "float32", "shape": (2,), "names": ["j1", "j2"]},
|
|
}
|
|
|
|
dataset = LeRobotDataset.create(
|
|
repo_id="test/multi_cam",
|
|
fps=30,
|
|
features=features,
|
|
root=tmp_path / "multi_cam_test",
|
|
use_videos=True,
|
|
streaming_encoding=True,
|
|
)
|
|
|
|
num_frames = 15
|
|
for _ in range(num_frames):
|
|
frame = {
|
|
"observation.images.cam1": np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8),
|
|
"observation.images.cam2": np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8),
|
|
"action": np.random.randn(2).astype(np.float32),
|
|
"task": "test task",
|
|
}
|
|
dataset.add_frame(frame)
|
|
|
|
dataset.save_episode()
|
|
|
|
assert dataset.meta.total_episodes == 1
|
|
assert dataset.meta.total_frames == num_frames
|
|
|
|
dataset.finalize()
|