From e96339a3b49ac080e11664d2be4737987d861a57 Mon Sep 17 00:00:00 2001 From: Steven Palma Date: Mon, 23 Feb 2026 13:57:43 +0100 Subject: [PATCH] feat(dataset): add streaming video encoding + HW encoder support (#2974) * feat(dataset): init stream encoding * feat(dataset): use threads to fix frame pickle latency * refactor(dataset): remove HW encoded related changes * add lp (#2977) * feat(dataset): add Hw encoding + log drop frames (#2978) * chore(docs): add streaming video encoding guide * fix(dataset): style docs + testing * chore(docs): simplify sttreaming video encoding guide * chore(dataset): add commands + streaming encoding default false + print note if false + queue default is now 30 * chore(docs): add verification note advice * chore(dataset): adjusting defaults & docs for streaming encoding * docs(scripts): improve docstrings * test(dataset): polish streaming encoding tests * chore(dataset): move FYI log related to streaming * chore(dataset): add arg vcodec to suggestions * refactor(dataset): better handling for auto and available vcodec * chore(dataset): change log level * docs(dataset): add note related to training performance vcodec * docs(dataset): add more notes to streaming encoding --------- Co-authored-by: Caroline Pascal Co-authored-by: Pepijn --- docs/source/_toctree.yml | 2 + docs/source/act.mdx | 3 + docs/source/earthrover_mini_plus.mdx | 3 + docs/source/groot.mdx | 9 +- docs/source/hope_jr.mdx | 6 + docs/source/il_robots.mdx | 8 +- docs/source/lerobot-dataset-v3.mdx | 5 +- docs/source/reachy2.mdx | 6 + docs/source/smolvla.mdx | 3 + docs/source/streaming_video_encoding.mdx | 155 ++++ docs/source/unitree_g1.mdx | 10 +- src/lerobot/datasets/lerobot_dataset.py | 124 ++- src/lerobot/datasets/video_utils.py | 480 +++++++++++- src/lerobot/scripts/lerobot_record.py | 34 +- tests/datasets/test_datasets.py | 9 +- .../datasets/test_streaming_video_encoder.py | 730 ++++++++++++++++++ 16 files changed, 1532 insertions(+), 55 deletions(-) create mode 100644 docs/source/streaming_video_encoding.mdx create mode 100644 tests/datasets/test_streaming_video_encoder.py diff --git a/docs/source/_toctree.yml b/docs/source/_toctree.yml index d61aac9c1..1055975d7 100644 --- a/docs/source/_toctree.yml +++ b/docs/source/_toctree.yml @@ -29,6 +29,8 @@ title: Using the Dataset Tools - local: dataset_subtask title: Using Subtasks in the Dataset + - local: streaming_video_encoding + title: Streaming Video Encoding title: "Datasets" - sections: - local: act diff --git a/docs/source/act.mdx b/docs/source/act.mdx index e3294ca69..453bcbba8 100644 --- a/docs/source/act.mdx +++ b/docs/source/act.mdx @@ -88,5 +88,8 @@ lerobot-record \ --dataset.repo_id=${HF_USER}/eval_act_your_dataset \ --dataset.num_episodes=10 \ --dataset.single_task="Your task description" \ + --dataset.streaming_encoding=true \ + --dataset.encoder_threads=2 \ + # --dataset.vcodec=auto \ --policy.path=${HF_USER}/act_policy ``` diff --git a/docs/source/earthrover_mini_plus.mdx b/docs/source/earthrover_mini_plus.mdx index dd9c2ad2b..cfc3a2eef 100644 --- a/docs/source/earthrover_mini_plus.mdx +++ b/docs/source/earthrover_mini_plus.mdx @@ -192,6 +192,9 @@ lerobot-record \ --dataset.num_episodes=2 \ --dataset.fps=10 \ --dataset.single_task="Navigate around obstacles" \ + --dataset.streaming_encoding=true \ + --dataset.encoder_threads=2 \ + # --dataset.vcodec=auto \ --display_data=true ``` diff --git a/docs/source/groot.mdx b/docs/source/groot.mdx index 8bfc22996..0ef591466 100644 --- a/docs/source/groot.mdx +++ b/docs/source/groot.mdx @@ -120,9 +120,12 @@ lerobot-record \ --display_data=true \ --dataset.repo_id=/eval_groot-bimanual \ --dataset.num_episodes=10 \ - --dataset.single_task="Grab and handover the red cube to the other arm" - --policy.path=/groot-bimanual # your trained model - --dataset.episode_time_s=30 + --dataset.single_task="Grab and handover the red cube to the other arm" \ + --dataset.streaming_encoding=true \ + --dataset.encoder_threads=2 \ + # --dataset.vcodec=auto \ + --policy.path=/groot-bimanual \ # your trained model + --dataset.episode_time_s=30 \ --dataset.reset_time_s=10 ``` diff --git a/docs/source/hope_jr.mdx b/docs/source/hope_jr.mdx index 026cd084a..8826d9758 100644 --- a/docs/source/hope_jr.mdx +++ b/docs/source/hope_jr.mdx @@ -230,6 +230,9 @@ lerobot-record \ --dataset.episode_time_s=5 \ --dataset.push_to_hub=true \ --dataset.private=true \ + --dataset.streaming_encoding=true \ + --dataset.encoder_threads=2 \ + # --dataset.vcodec=auto \ --display_data=true ``` @@ -273,5 +276,8 @@ lerobot-record \ --dataset.repo_id=/eval_hopejr \ --dataset.single_task="Evaluate hopejr hand policy" \ --dataset.num_episodes=10 \ + --dataset.streaming_encoding=true \ + --dataset.encoder_threads=2 \ + # --dataset.vcodec=auto \ --policy.path=outputs/train/hopejr_hand/checkpoints/last/pretrained_model ``` diff --git a/docs/source/il_robots.mdx b/docs/source/il_robots.mdx index 84dc6f2f6..7fc770b0c 100644 --- a/docs/source/il_robots.mdx +++ b/docs/source/il_robots.mdx @@ -185,7 +185,10 @@ lerobot-record \ --display_data=true \ --dataset.repo_id=${HF_USER}/record-test \ --dataset.num_episodes=5 \ - --dataset.single_task="Grab the black cube" + --dataset.single_task="Grab the black cube" \ + --dataset.streaming_encoding=true \ + # --dataset.vcodec=auto \ + --dataset.encoder_threads=2 ``` @@ -515,6 +518,9 @@ lerobot-record \ --display_data=false \ --dataset.repo_id=${HF_USER}/eval_so100 \ --dataset.single_task="Put lego brick into the transparent box" \ + --dataset.streaming_encoding=true \ + --dataset.encoder_threads=2 \ + # --dataset.vcodec=auto \ # <- Teleop optional if you want to teleoperate in between episodes \ # --teleop.type=so100_leader \ # --teleop.port=/dev/ttyACM0 \ diff --git a/docs/source/lerobot-dataset-v3.mdx b/docs/source/lerobot-dataset-v3.mdx index 3521914f2..235a355bd 100644 --- a/docs/source/lerobot-dataset-v3.mdx +++ b/docs/source/lerobot-dataset-v3.mdx @@ -41,7 +41,10 @@ lerobot-record \ --display_data=true \ --dataset.repo_id=${HF_USER}/record-test \ --dataset.num_episodes=5 \ - --dataset.single_task="Grab the black cube" + --dataset.single_task="Grab the black cube" \ + --dataset.streaming_encoding=true \ + # --dataset.vcodec=auto \ + --dataset.encoder_threads=2 ``` See the [recording guide](./il_robots#record-a-dataset) for more details. diff --git a/docs/source/reachy2.mdx b/docs/source/reachy2.mdx index 51b09acd2..1b868711a 100644 --- a/docs/source/reachy2.mdx +++ b/docs/source/reachy2.mdx @@ -159,6 +159,9 @@ lerobot-record \ --dataset.fps=15 \ --dataset.push_to_hub=true \ --dataset.private=true \ + --dataset.streaming_encoding=true \ + --dataset.encoder_threads=2 \ + # --dataset.vcodec=auto \ --display_data=true ``` @@ -198,6 +201,9 @@ lerobot-record \ --dataset.fps=15 \ --dataset.push_to_hub=true \ --dataset.private=true \ + --dataset.streaming_encoding=true \ + --dataset.encoder_threads=2 \ + # --dataset.vcodec=auto \ --display_data=true ``` diff --git a/docs/source/smolvla.mdx b/docs/source/smolvla.mdx index a56298b5e..bf8a0d2f0 100644 --- a/docs/source/smolvla.mdx +++ b/docs/source/smolvla.mdx @@ -106,6 +106,9 @@ lerobot-record \ --dataset.repo_id=${HF_USER}/eval_DATASET_NAME_test \ # <- This will be the dataset name on HF Hub --dataset.episode_time_s=50 \ --dataset.num_episodes=10 \ + --dataset.streaming_encoding=true \ + --dataset.encoder_threads=2 \ + # --dataset.vcodec=auto \ # <- Teleop optional if you want to teleoperate in between episodes \ # --teleop.type=so100_leader \ # --teleop.port=/dev/ttyACM0 \ diff --git a/docs/source/streaming_video_encoding.mdx b/docs/source/streaming_video_encoding.mdx new file mode 100644 index 000000000..40004200e --- /dev/null +++ b/docs/source/streaming_video_encoding.mdx @@ -0,0 +1,155 @@ +# Streaming Video Encoding Guide + +## 1. Overview + +Streaming video encoding eliminates the traditional PNG round-trip during video dataset recording. Instead of: + +1. Capture frame -> write PNG to disk -> (at episode end) read PNG's -> encode to MP4 -> delete PNG's + +Frames can be encoded in real-time during capture: + +1. Capture frame -> queue to encoder thread -> encode to MP4 directly + +This makes `save_episode()` near-instant (the video is already encoded by the time the episode ends) and removes the blocking wait that previously occurred between episodes, especially with multiple cameras in long episodes. + +## 2. Tuning Parameters + +| Parameter | CLI Flag | Type | Default | Description | +| ----------------------- | --------------------------------- | ------------- | ------------- | ----------------------------------------------------------------- | +| `streaming_encoding` | `--dataset.streaming_encoding` | `bool` | `True` | Enable real-time encoding during capture | +| `vcodec` | `--dataset.vcodec` | `str` | `"libsvtav1"` | Video codec. `"auto"` detects best HW encoder | +| `encoder_threads` | `--dataset.encoder_threads` | `int \| None` | `None` (auto) | Threads per encoder instance. `None` will leave the vcoded decide | +| `encoder_queue_maxsize` | `--dataset.encoder_queue_maxsize` | `int` | `60` | Max buffered frames per camera (~2s at 30fps). Consumes RAM | + +## 3. Performance Considerations + +Streaming encoding means the CPU is encoding video **during** the capture loop, not after. This creates a CPU budget that must be shared between: + +- **Control loop** (reading cameras, control the robot, writing non-video data) +- **Encoder threads** (one pool per camera) +- **Rerun visualization** (if enabled) +- **OS and other processes** + +### Resolution & Number of Cameras Impact + +| Setup | Throughput (px/sec) | CPU Encoding Load | Notes | +| ------------------------- | ------------------- | ----------------- | ------------------------------ | +| 2camsx 640x480x3 @30fps | 55M | Low | Works on most systems | +| 2camsx 1280x720x3 @30fps | 165M | Moderate | Comfortable on modern systems | +| 2camsx 1920x1080x3 @30fps | 373M | High | Requires powerful high-end CPU | + +### `encoder_threads` Tuning + +This parameter controls how many threads each encoder instance uses internally: + +- **Higher values** (e.g., 4-5): Faster encoding, but uses more CPU cores per camera. Good for high-end systems with many cores. +- **Lower values** (e.g., 1-2): Less CPU per camera, freeing cores for capture and visualization. Good for low-res images and capable CPUs. +- **`None` (default)**: Lets the codec decide. Information available in the codec logs. + +### Backpressure and Frame Dropping + +Each camera has a bounded queue (`encoder_queue_maxsize`, default 60 frames). When the encoder can't keep up: + +1. The queue fills up (consuming RAM) +2. New frames are **dropped** (not blocked) — the capture loop continues uninterrupted +3. A warning is logged: `"Encoder queue full for {camera}, dropped N frame(s)"` +4. At episode end, total dropped frames per camera are reported + +### Symptoms of Encoder Falling Behind + +- **System feels laggy and freezes**: all CPUs are at 100% +- **Dropped frame warnings** in the log or lower frames/FPS than expected in the recorded dataset +- **Choppy robot movement**: If CPU is severely overloaded, even the capture loop may be affected +- **Accumulated rerun lag**: Visualization falls behind real-time + +## 4. Hardware-Accelerated Encoding + +### When to Use + +Use HW encoding when: + +- CPU is the bottleneck (dropped frames, choppy robot, rerun lag) +- You have compatible hardware (GPU or dedicated encoder) +- You're recording at high throughput (high resolution or with many cameras) + +### Choosing a Codec + +| Codec | CPU Usage | File Size | Quality | Notes | +| --------------------- | --------- | -------------- | ------- | ---------------------------------------------------------------- | +| `libsvtav1` (default) | High | Smallest | Best | Default. Best compression but most CPU-intensive | +| `h264` | Medium | ~30-50% larger | Good | Software H.264. Lower CPU | +| HW encoders | Very Low | Largest | Good | Offloads to dedicated hardware. Best for CPU-constrained systems | + +### Available HW Encoders + +| Encoder | Platform | Hardware | CLI Value | +| ------------------- | ------------- | ------------------------------------------------------------------------------------------------ | ------------------------------------ | +| `h264_videotoolbox` | macOS | Apple Silicon / Intel | `--dataset.vcodec=h264_videotoolbox` | +| `hevc_videotoolbox` | macOS | Apple Silicon / Intel | `--dataset.vcodec=hevc_videotoolbox` | +| `h264_nvenc` | Linux/Windows | NVIDIA GPU | `--dataset.vcodec=h264_nvenc` | +| `hevc_nvenc` | Linux/Windows | NVIDIA GPU | `--dataset.vcodec=hevc_nvenc` | +| `h264_vaapi` | Linux | Intel/AMD GPU | `--dataset.vcodec=h264_vaapi` | +| `h264_qsv` | Linux/Windows | Intel Quick Sync | `--dataset.vcodec=h264_qsv` | +| `auto` | Any | Probes the system for available HW encoders. Falls back to `libsvtav1` if no HW encoder is found | `--dataset.vcodec=auto` | + +> [!NOTE] +> In order to use the HW accelerated encoders you might need to upgrade your GPU drivers. + +> [!NOTE] +> `libsvtav1` is the default because it provides the best training performance; other vcodecs can reduce CPU usage and be faster, but they typically produce larger files and may affect training time. + +## 5. Troubleshooting + +| Symptom | Likely Cause | Fix | +| ------------------------------------------------------------------ | -------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| System freezes or choppy robot movement or Rerun visualization lag | CPU starved (100% load usage) | Close other apps, reduce encoding throughput, lower `encoder_threads`, use `h264`, use `display_data=False`. If the CPU continues to be at 100% then it might be insufficient for your setup, consider `--dataset.streaming_encoding=false` or HW encoding (`--dataset.vcodec=auto`) | +| "Encoder queue full" warnings or dropped frames in dataset | Encoder can't keep up (Queue overflow) | If CPU is not at 100%: Increase `encoder_threads`, increase `encoder_queue_maxsize` or use HW encoding (`--dataset.vcodec=auto`). | +| High RAM usage | Queue filling faster than encoding | `encoder_threads` too low or CPU insufficient. Reduce `encoder_queue_maxsize` or use HW encoding | +| Large video files | Using HW encoder or H.264 | Expected trade-off. Switch to `libsvtav1` if CPU allows | +| `save_episode()` still slow | `streaming_encoding` is `False` | Set `--dataset.streaming_encoding=true` | +| Encoder thread crash | Codec not available or invalid settings | Check `vcodec` is installed, try `--dataset.vcodec=auto` | +| Recorded dataset is missing frames | CPU/GPU starvation or occasional load spikes | If ~5% of frames are missing, your system is likely overloaded — follow the recommendations above. If fewer frames are missing (~2%), they are probably due to occasional transient load spikes (often at startup) and can be considered expected. | + +## 6. Recommended Configurations + +These estimates are conservative; we recommend testing them on your setup—start with a low load and increase it gradually. + +### High-End Systems: modern 12+ cores (24+ threads) + +A throughput between ~250-500M px/sec should be comfortable in CPU. For even better results try HW encoding if available. + +```bash +# 3camsx 1280x720x3 @30fps: Defaults work well. Optionally increase encoder parallelism. +# 2camsx 1920x1080x3 @30fps: Defaults work well. Optionally increase encoder parallelism. +lerobot-record --dataset.encoder_threads=5 ... + +# 3camsx 1920x1080x3 @30fps: Might require some tuning. +``` + +### Mid-Range Systems: modern 8+ cores (16+ threads) or Apple Silicon + +A throughput between ~80-300M px/sec should be possible in CPU. + +```bash +# 3camsx 640x480x3 @30fps: Defaults work well. Optionally decrease encoder parallelism. +# 2camsx 1280x720x3 @30fps: Defaults work well. Optionally decrease encoder parallelism. +lerobot-record --dataset.encoder_threads=2 ... + +# 2camsx 1920x1080x3 @30fps: Might require some tuning. +``` + +### Low-Resource Systems: modern 4+ cores (8+ threads) or Raspberry Pi 5 + +On very constrained systems, streaming encoding may compete too heavily with the capture loop. Disabling it falls back to the PNG-based approach where encoding happens between episodes (blocking, but doesn't interfere with capture). Alternatively, record at a lower throughput to reduce both capture and encoding load. Consider also changing codec to `h264` and using batch encoding. + +```bash +# 2camsx 640x480x3 @30fps: Requires some tuning. + +# Use H.264, disable streaming, consider batching encoding +lerobot-record --dataset.vcodec=h264 --dataset.streaming_encoding=false ... +``` + +## 7. Closing note + +Performance ultimately depends on your exact setup — frames-per-second, resolution, CPU cores and load, available memory, episode length, and the encoder you choose. Always test with your target workload, be mindful about your CPU & system capabilities and tune `encoder_threads`, `encoder_queue_maxsize`, and +`vcodec` reasonably. That said, a common practical configuration (for many applications) is three cameras at 640×480x3 @30fps; this usually runs fine with the default streaming video encoding settings in modern systems. Always verify your recorded dataset is healthy by comparing the video duration to the CLI episode duration and confirming the row count equals FPS × CLI duration. diff --git a/docs/source/unitree_g1.mdx b/docs/source/unitree_g1.mdx index 4c5d28924..76e972dca 100644 --- a/docs/source/unitree_g1.mdx +++ b/docs/source/unitree_g1.mdx @@ -229,7 +229,10 @@ lerobot-record \ --dataset.num_episodes=2 \ --dataset.episode_time_s=5 \ --dataset.reset_time_s=5 \ - --dataset.push_to_hub=true + --dataset.push_to_hub=true \ + --dataset.streaming_encoding=true \ + # --dataset.vcodec=auto \ + --dataset.encoder_threads=2 ``` Example simulation dataset: [nepyope/teleop_test_sim](https://huggingface.co/datasets/nepyope/teleop_test_sim) @@ -279,7 +282,10 @@ lerobot-record \ --dataset.num_episodes=2 \ --dataset.episode_time_s=5 \ --dataset.reset_time_s=5 \ - --dataset.push_to_hub=true + --dataset.push_to_hub=true \ + --dataset.streaming_encoding=true \ + # --dataset.vcodec=auto \ + --dataset.encoder_threads=2 ``` **Note**: Update `server_address` to match your robot's camera server IP. diff --git a/src/lerobot/datasets/lerobot_dataset.py b/src/lerobot/datasets/lerobot_dataset.py index 360ed8d30..65b475e26 100644 --- a/src/lerobot/datasets/lerobot_dataset.py +++ b/src/lerobot/datasets/lerobot_dataset.py @@ -68,6 +68,7 @@ from lerobot.datasets.utils import ( write_tasks, ) from lerobot.datasets.video_utils import ( + StreamingVideoEncoder, VideoFrame, concatenate_video_files, decode_video_frames, @@ -75,11 +76,11 @@ from lerobot.datasets.video_utils import ( get_safe_default_codec, get_video_duration_in_s, get_video_info, + resolve_vcodec, ) from lerobot.utils.constants import HF_LEROBOT_HOME CODEBASE_VERSION = "v3.0" -VALID_VIDEO_CODECS = {"h264", "hevc", "libsvtav1"} class LeRobotDatasetMetadata: @@ -545,12 +546,19 @@ class LeRobotDatasetMetadata: def _encode_video_worker( - video_key: str, episode_index: int, root: Path, fps: int, vcodec: str = "libsvtav1" + video_key: str, + episode_index: int, + root: Path, + fps: int, + vcodec: str = "libsvtav1", + encoder_threads: int | None = None, ) -> Path: temp_path = Path(tempfile.mkdtemp(dir=root)) / f"{video_key}_{episode_index:03d}.mp4" fpath = DEFAULT_IMAGE_PATH.format(image_key=video_key, episode_index=episode_index, frame_index=0) img_dir = (root / fpath).parent - encode_video_frames(img_dir, temp_path, fps, vcodec=vcodec, overwrite=True) + encode_video_frames( + img_dir, temp_path, fps, vcodec=vcodec, overwrite=True, encoder_threads=encoder_threads + ) shutil.rmtree(img_dir) return temp_path @@ -570,6 +578,9 @@ class LeRobotDataset(torch.utils.data.Dataset): video_backend: str | None = None, batch_encoding_size: int = 1, vcodec: str = "libsvtav1", + streaming_encoding: bool = False, + encoder_queue_maxsize: int = 30, + encoder_threads: int | None = None, ): """ 2 modes are available for instantiating this class, depending on 2 different use cases: @@ -683,12 +694,17 @@ class LeRobotDataset(torch.utils.data.Dataset): batch_encoding_size (int, optional): Number of episodes to accumulate before batch encoding videos. Set to 1 for immediate encoding (default), or higher for batched encoding. Defaults to 1. vcodec (str, optional): Video codec for encoding videos during recording. Options: 'h264', 'hevc', - 'libsvtav1'. Defaults to 'libsvtav1'. Use 'h264' for faster encoding on systems where AV1 - encoding is CPU-heavy. + 'libsvtav1', 'auto', or hardware-specific codecs like 'h264_videotoolbox', 'h264_nvenc'. + Defaults to 'libsvtav1'. Use 'auto' to auto-detect the best available hardware encoder. + streaming_encoding (bool, optional): If True, encode video frames in real-time during capture + instead of writing PNG images first. This makes save_episode() near-instant. Defaults to False. + encoder_queue_maxsize (int, optional): Maximum number of frames to buffer per camera when using + streaming encoding. Defaults to 30 (~1s at 30fps). + encoder_threads (int | None, optional): Number of threads per encoder instance. None lets the + codec auto-detect (default). Lower values reduce CPU usage per encoder. Maps to 'lp' (via svtav1-params) for + libsvtav1 and 'threads' for h264/hevc. """ super().__init__() - if vcodec not in VALID_VIDEO_CODECS: - raise ValueError(f"Invalid vcodec '{vcodec}'. Must be one of: {sorted(VALID_VIDEO_CODECS)}") self.repo_id = repo_id self.root = Path(root) if root else HF_LEROBOT_HOME / repo_id self.image_transforms = image_transforms @@ -700,7 +716,8 @@ class LeRobotDataset(torch.utils.data.Dataset): self.delta_indices = None self.batch_encoding_size = batch_encoding_size self.episodes_since_last_encoding = 0 - self.vcodec = vcodec + self.vcodec = resolve_vcodec(vcodec) + self._encoder_threads = encoder_threads # Unused attributes self.image_writer = None @@ -708,6 +725,7 @@ class LeRobotDataset(torch.utils.data.Dataset): self.writer = None self.latest_episode = None self._current_file_start_frame = None # Track the starting frame index of the current parquet file + self._streaming_encoder = None self.root.mkdir(exist_ok=True, parents=True) @@ -749,6 +767,19 @@ class LeRobotDataset(torch.utils.data.Dataset): check_delta_timestamps(self.delta_timestamps, self.fps, self.tolerance_s) self.delta_indices = get_delta_indices(self.delta_timestamps, self.fps) + # Initialize streaming encoder for resumed recording + if streaming_encoding and len(self.meta.video_keys) > 0: + self._streaming_encoder = StreamingVideoEncoder( + fps=self.meta.fps, + vcodec=self.vcodec, + pix_fmt="yuv420p", + g=2, + crf=30, + preset=None, + queue_maxsize=encoder_queue_maxsize, + encoder_threads=encoder_threads, + ) + def _close_writer(self) -> None: """Close and cleanup the parquet writer if it exists.""" writer = getattr(self, "writer", None) @@ -1104,6 +1135,8 @@ class LeRobotDataset(torch.utils.data.Dataset): """ self._close_writer() self.meta._close_writer() + if self._streaming_encoder is not None: + self._streaming_encoder.close() def create_episode_buffer(self, episode_index: int | None = None) -> dict: current_ep_idx = self.meta.total_episodes if episode_index is None else episode_index @@ -1158,6 +1191,13 @@ class LeRobotDataset(torch.utils.data.Dataset): self.episode_buffer["timestamp"].append(timestamp) self.episode_buffer["task"].append(frame.pop("task")) # Remove task from frame after processing + # Start streaming encoder on first frame of episode (once, before iterating keys) + if frame_index == 0 and self._streaming_encoder is not None: + self._streaming_encoder.start_episode( + video_keys=list(self.meta.video_keys), + temp_dir=self.root, + ) + # Add frame features to episode_buffer for key in frame: if key not in self.features: @@ -1165,7 +1205,10 @@ class LeRobotDataset(torch.utils.data.Dataset): f"An element of the frame is not in the features. '{key}' not in '{self.features.keys()}'." ) - if self.features[key]["dtype"] in ["image", "video"]: + if self.features[key]["dtype"] == "video" and self._streaming_encoder is not None: + self._streaming_encoder.feed_frame(key, frame[key]) + self.episode_buffer[key].append(None) # Placeholder (video keys are skipped in parquet) + elif self.features[key]["dtype"] in ["image", "video"]: img_path = self._get_image_file_path( episode_index=self.episode_buffer["episode_index"], image_key=key, frame_index=frame_index ) @@ -1226,13 +1269,38 @@ class LeRobotDataset(torch.utils.data.Dataset): # Wait for image writer to end, so that episode stats over images can be computed self._wait_image_writer() - ep_stats = compute_episode_stats(episode_buffer, self.features) - ep_metadata = self._save_episode_data(episode_buffer) has_video_keys = len(self.meta.video_keys) > 0 + use_streaming = self._streaming_encoder is not None and has_video_keys use_batched_encoding = self.batch_encoding_size > 1 - if has_video_keys and not use_batched_encoding: + if use_streaming: + # Compute stats for non-video features only (video stats come from encoder) + non_video_buffer = { + k: v + for k, v in episode_buffer.items() + if self.features.get(k, {}).get("dtype") not in ("video",) + } + non_video_features = {k: v for k, v in self.features.items() if v["dtype"] != "video"} + ep_stats = compute_episode_stats(non_video_buffer, non_video_features) + else: + ep_stats = compute_episode_stats(episode_buffer, self.features) + + ep_metadata = self._save_episode_data(episode_buffer) + + if use_streaming: + # Finish streaming encoding and collect results + streaming_results = self._streaming_encoder.finish_episode() + for video_key in self.meta.video_keys: + temp_path, video_stats = streaming_results[video_key] + if video_stats is not None: + # Format stats same as compute_episode_stats: normalize to [0,1], reshape to (C,1,1) + ep_stats[video_key] = { + k: v if k == "count" else np.squeeze(v.reshape(1, -1, 1, 1) / 255.0, axis=0) + for k, v in video_stats.items() + } + ep_metadata.update(self._save_episode_video(video_key, episode_index, temp_path=temp_path)) + elif has_video_keys and not use_batched_encoding: num_cameras = len(self.meta.video_keys) if parallel_encoding and num_cameras > 1: # TODO(Steven): Ideally we would like to control the number of threads per encoding such that: @@ -1246,6 +1314,7 @@ class LeRobotDataset(torch.utils.data.Dataset): self.root, self.fps, self.vcodec, + self._encoder_threads, ): video_key for video_key in self.meta.video_keys } @@ -1514,6 +1583,10 @@ class LeRobotDataset(torch.utils.data.Dataset): return metadata def clear_episode_buffer(self, delete_images: bool = True) -> None: + # Cancel streaming encoder if active + if self._streaming_encoder is not None: + self._streaming_encoder.cancel_episode() + # Clean up image files for the current episode buffer if delete_images: # Wait for the async image writer to finish @@ -1561,7 +1634,9 @@ class LeRobotDataset(torch.utils.data.Dataset): Note: `encode_video_frames` is a blocking call. Making it asynchronous shouldn't speedup encoding, since video encoding with ffmpeg is already using multithreading. """ - return _encode_video_worker(video_key, episode_index, self.root, self.fps, self.vcodec) + return _encode_video_worker( + video_key, episode_index, self.root, self.fps, self.vcodec, self._encoder_threads + ) @classmethod def create( @@ -1578,10 +1653,12 @@ class LeRobotDataset(torch.utils.data.Dataset): video_backend: str | None = None, batch_encoding_size: int = 1, vcodec: str = "libsvtav1", + streaming_encoding: bool = False, + encoder_queue_maxsize: int = 30, + encoder_threads: int | None = None, ) -> "LeRobotDataset": """Create a LeRobot Dataset from scratch in order to record data.""" - if vcodec not in VALID_VIDEO_CODECS: - raise ValueError(f"Invalid vcodec '{vcodec}'. Must be one of: {sorted(VALID_VIDEO_CODECS)}") + vcodec = resolve_vcodec(vcodec) obj = cls.__new__(cls) obj.meta = LeRobotDatasetMetadata.create( repo_id=repo_id, @@ -1599,6 +1676,7 @@ class LeRobotDataset(torch.utils.data.Dataset): obj.batch_encoding_size = batch_encoding_size obj.episodes_since_last_encoding = 0 obj.vcodec = vcodec + obj._encoder_threads = encoder_threads if image_writer_processes or image_writer_threads: obj.start_image_writer(image_writer_processes, image_writer_threads) @@ -1620,6 +1698,22 @@ class LeRobotDataset(torch.utils.data.Dataset): obj._lazy_loading = False obj._recorded_frames = 0 obj._writer_closed_for_reading = False + + # Initialize streaming encoder + if streaming_encoding and len(obj.meta.video_keys) > 0: + obj._streaming_encoder = StreamingVideoEncoder( + fps=fps, + vcodec=vcodec, + pix_fmt="yuv420p", + g=2, + crf=30, + preset=None, + queue_maxsize=encoder_queue_maxsize, + encoder_threads=encoder_threads, + ) + else: + obj._streaming_encoder = None + return obj diff --git a/src/lerobot/datasets/video_utils.py b/src/lerobot/datasets/video_utils.py index 84ce13772..acc24a9e0 100644 --- a/src/lerobot/datasets/video_utils.py +++ b/src/lerobot/datasets/video_utils.py @@ -13,25 +13,106 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import contextlib import glob import importlib import logging +import queue import shutil import tempfile +import threading import warnings from dataclasses import dataclass, field +from fractions import Fraction from pathlib import Path from threading import Lock from typing import Any, ClassVar import av import fsspec +import numpy as np import pyarrow as pa import torch import torchvision from datasets.features.features import register_feature from PIL import Image +# List of hardware encoders to probe for auto-selection. Availability depends on the platform and FFmpeg build. +# Determines the order of preference for auto-selection when vcodec="auto" is used. +HW_ENCODERS = [ + "h264_videotoolbox", # macOS + "hevc_videotoolbox", # macOS + "h264_nvenc", # NVIDIA GPU + "hevc_nvenc", # NVIDIA GPU + "h264_vaapi", # Linux Intel/AMD + "h264_qsv", # Intel Quick Sync +] + +VALID_VIDEO_CODECS = {"h264", "hevc", "libsvtav1", "auto"} | set(HW_ENCODERS) + + +def _get_codec_options( + vcodec: str, + g: int | None = 2, + crf: int | None = 30, + preset: int | None = None, +) -> dict: + """Build codec-specific options dict for video encoding.""" + options = {} + + # GOP size (keyframe interval) - supported by VideoToolbox and software encoders + if g is not None and (vcodec in ("h264_videotoolbox", "hevc_videotoolbox") or vcodec not in HW_ENCODERS): + options["g"] = str(g) + + # Quality control (codec-specific parameter names) + if crf is not None: + if vcodec in ("h264", "hevc", "libsvtav1"): + options["crf"] = str(crf) + elif vcodec in ("h264_videotoolbox", "hevc_videotoolbox"): + quality = max(1, min(100, int(100 - crf * 2))) + options["q:v"] = str(quality) + elif vcodec in ("h264_nvenc", "hevc_nvenc"): + options["rc"] = "constqp" + options["qp"] = str(crf) + elif vcodec in ("h264_vaapi",): + options["qp"] = str(crf) + elif vcodec in ("h264_qsv",): + options["global_quality"] = str(crf) + + # Preset (only for libsvtav1) + if vcodec == "libsvtav1": + options["preset"] = str(preset) if preset is not None else "12" + + return options + + +def detect_available_hw_encoders() -> list[str]: + """Probe PyAV/FFmpeg for available hardware video encoders.""" + available = [] + for codec_name in HW_ENCODERS: + try: + av.codec.Codec(codec_name, "w") + available.append(codec_name) + except Exception: # nosec B110 + pass # nosec B110 + return available + + +def resolve_vcodec(vcodec: str) -> str: + """Validate vcodec and resolve 'auto' to best available HW encoder, fallback to libsvtav1.""" + if vcodec not in VALID_VIDEO_CODECS: + raise ValueError(f"Invalid vcodec '{vcodec}'. Must be one of: {sorted(VALID_VIDEO_CODECS)}") + if vcodec != "auto": + logging.info(f"Using video codec: {vcodec}") + return vcodec + available = detect_available_hw_encoders() + for encoder in HW_ENCODERS: + if encoder in available: + logging.info(f"Auto-selected video codec: {encoder}") + return encoder + logging.info("No hardware encoder available, falling back to software encoder 'libsvtav1'") + return "libsvtav1" + def get_safe_default_codec(): if importlib.util.find_spec("torchcodec"): @@ -309,14 +390,13 @@ def encode_video_frames( g: int | None = 2, crf: int | None = 30, fast_decode: int = 0, - log_level: int | None = av.logging.ERROR, + log_level: int | None = av.logging.WARNING, overwrite: bool = False, preset: int | None = None, + encoder_threads: int | None = None, ) -> None: """More info on ffmpeg arguments tuning on `benchmark/video/README.md`""" - # Check encoder availability - if vcodec not in ["h264", "hevc", "libsvtav1"]: - raise ValueError(f"Unsupported video codec: {vcodec}. Supported codecs are: h264, hevc, libsvtav1.") + vcodec = resolve_vcodec(vcodec) video_path = Path(video_path) imgs_dir = Path(imgs_dir) @@ -347,21 +427,22 @@ def encode_video_frames( width, height = dummy_image.size # Define video codec options - video_options = {} - - if g is not None: - video_options["g"] = str(g) - - if crf is not None: - video_options["crf"] = str(crf) + video_options = _get_codec_options(vcodec, g, crf, preset) if fast_decode: key = "svtav1-params" if vcodec == "libsvtav1" else "tune" value = f"fast-decode={fast_decode}" if vcodec == "libsvtav1" else "fastdecode" video_options[key] = value - if vcodec == "libsvtav1": - video_options["preset"] = str(preset) if preset is not None else "12" + if encoder_threads is not None: + if vcodec == "libsvtav1": + lp_param = f"lp={encoder_threads}" + if "svtav1-params" in video_options: + video_options["svtav1-params"] += f":{lp_param}" + else: + video_options["svtav1-params"] = lp_param + else: + video_options["threads"] = str(encoder_threads) # Set logging level if log_level is not None: @@ -480,6 +561,348 @@ def concatenate_video_files( Path(tmp_concatenate_path).unlink() +class _CameraEncoderThread(threading.Thread): + """A thread that encodes video frames streamed via a queue into an MP4 file. + + One instance is created per camera per episode. Frames are received as numpy arrays + from the main thread, encoded in real-time using PyAV (which releases the GIL during + encoding), and written to disk. Stats are computed incrementally using + RunningQuantileStats and returned via result_queue. + """ + + def __init__( + self, + video_path: Path, + fps: int, + vcodec: str, + pix_fmt: str, + g: int | None, + crf: int | None, + preset: int | None, + frame_queue: queue.Queue, + result_queue: queue.Queue, + stop_event: threading.Event, + encoder_threads: int | None = None, + ): + super().__init__(daemon=True) + self.video_path = video_path + self.fps = fps + self.vcodec = vcodec + self.pix_fmt = pix_fmt + self.g = g + self.crf = crf + self.preset = preset + self.frame_queue = frame_queue + self.result_queue = result_queue + self.stop_event = stop_event + self.encoder_threads = encoder_threads + + def run(self) -> None: + from lerobot.datasets.compute_stats import RunningQuantileStats, auto_downsample_height_width + + container = None + output_stream = None + stats_tracker = RunningQuantileStats() + frame_count = 0 + + try: + logging.getLogger("libav").setLevel(av.logging.WARNING) + + while True: + try: + frame_data = self.frame_queue.get(timeout=1) + except queue.Empty: + if self.stop_event.is_set(): + break + continue + + if frame_data is None: + # Sentinel: flush and close + break + + # Ensure HWC uint8 numpy array + if isinstance(frame_data, np.ndarray): + if frame_data.ndim == 3 and frame_data.shape[0] == 3: + # CHW -> HWC + frame_data = frame_data.transpose(1, 2, 0) + if frame_data.dtype != np.uint8: + frame_data = (frame_data * 255).astype(np.uint8) + + # Open container on first frame (to get width/height) + if container is None: + height, width = frame_data.shape[:2] + video_options = _get_codec_options(self.vcodec, self.g, self.crf, self.preset) + if self.encoder_threads is not None: + if self.vcodec == "libsvtav1": + lp_param = f"lp={self.encoder_threads}" + if "svtav1-params" in video_options: + video_options["svtav1-params"] += f":{lp_param}" + else: + video_options["svtav1-params"] = lp_param + else: + video_options["threads"] = str(self.encoder_threads) + Path(self.video_path).parent.mkdir(parents=True, exist_ok=True) + container = av.open(str(self.video_path), "w") + output_stream = container.add_stream(self.vcodec, self.fps, options=video_options) + output_stream.pix_fmt = self.pix_fmt + output_stream.width = width + output_stream.height = height + output_stream.time_base = Fraction(1, self.fps) + + # Encode frame with explicit timestamps + pil_img = Image.fromarray(frame_data) + video_frame = av.VideoFrame.from_image(pil_img) + video_frame.pts = frame_count + video_frame.time_base = Fraction(1, self.fps) + packet = output_stream.encode(video_frame) + if packet: + container.mux(packet) + + # Update stats with downsampled frame (per-channel stats like compute_episode_stats) + img_chw = frame_data.transpose(2, 0, 1) # HWC -> CHW + img_downsampled = auto_downsample_height_width(img_chw) + # Reshape CHW to (H*W, C) for per-channel stats + channels = img_downsampled.shape[0] + img_for_stats = img_downsampled.transpose(1, 2, 0).reshape(-1, channels) + stats_tracker.update(img_for_stats) + + frame_count += 1 + + # Flush encoder + if output_stream is not None: + packet = output_stream.encode() + if packet: + container.mux(packet) + + if container is not None: + container.close() + + av.logging.restore_default_callback() + + # Get stats and put on result queue + if frame_count >= 2: + stats = stats_tracker.get_statistics() + self.result_queue.put(("ok", stats)) + else: + self.result_queue.put(("ok", None)) + + except Exception as e: + logging.error(f"Encoder thread error: {e}") + if container is not None: + with contextlib.suppress(Exception): + container.close() + self.result_queue.put(("error", str(e))) + + +class StreamingVideoEncoder: + """Manages per-camera encoder threads for real-time video encoding during recording. + + Instead of writing frames as PNG images and then encoding to MP4 at episode end, + this class streams frames directly to encoder threads, eliminating the + PNG round-trip and making save_episode() near-instant. + + Uses threading instead of multiprocessing to avoid the overhead of pickling large + numpy arrays through multiprocessing.Queue. PyAV's encode() releases the GIL, + so encoding runs in parallel with the main recording loop. + """ + + def __init__( + self, + fps: int, + vcodec: str = "libsvtav1", + pix_fmt: str = "yuv420p", + g: int | None = 2, + crf: int | None = 30, + preset: int | None = None, + queue_maxsize: int = 30, + encoder_threads: int | None = None, + ): + self.fps = fps + self.vcodec = resolve_vcodec(vcodec) + self.pix_fmt = pix_fmt + self.g = g + self.crf = crf + self.preset = preset + self.queue_maxsize = queue_maxsize + self.encoder_threads = encoder_threads + + self._frame_queues: dict[str, queue.Queue] = {} + self._result_queues: dict[str, queue.Queue] = {} + self._threads: dict[str, _CameraEncoderThread] = {} + self._stop_events: dict[str, threading.Event] = {} + self._video_paths: dict[str, Path] = {} + self._dropped_frames: dict[str, int] = {} + self._episode_active = False + + def start_episode(self, video_keys: list[str], temp_dir: Path) -> None: + """Start encoder threads for a new episode. + + Args: + video_keys: List of video feature keys (e.g. ["observation.images.laptop"]) + temp_dir: Base directory for temporary MP4 files + """ + if self._episode_active: + self.cancel_episode() + + self._dropped_frames.clear() + + for video_key in video_keys: + frame_queue: queue.Queue = queue.Queue(maxsize=self.queue_maxsize) + result_queue: queue.Queue = queue.Queue(maxsize=1) + stop_event = threading.Event() + + temp_video_dir = Path(tempfile.mkdtemp(dir=temp_dir)) + video_path = temp_video_dir / f"{video_key.replace('/', '_')}_streaming.mp4" + + encoder_thread = _CameraEncoderThread( + video_path=video_path, + fps=self.fps, + vcodec=self.vcodec, + pix_fmt=self.pix_fmt, + g=self.g, + crf=self.crf, + preset=self.preset, + frame_queue=frame_queue, + result_queue=result_queue, + stop_event=stop_event, + encoder_threads=self.encoder_threads, + ) + encoder_thread.start() + + self._frame_queues[video_key] = frame_queue + self._result_queues[video_key] = result_queue + self._threads[video_key] = encoder_thread + self._stop_events[video_key] = stop_event + self._video_paths[video_key] = video_path + + self._episode_active = True + + def feed_frame(self, video_key: str, image: np.ndarray) -> None: + """Feed a frame to the encoder for a specific camera. + + A copy of the image is made before enqueueing to prevent race conditions + with camera drivers that may reuse buffers. If the encoder queue is full + (encoder can't keep up), the frame is dropped with a warning instead of + crashing the recording session. + + Args: + video_key: The video feature key + image: numpy array in (H,W,C) or (C,H,W) format, uint8 or float + + Raises: + RuntimeError: If the encoder thread has crashed + """ + if not self._episode_active: + raise RuntimeError("No active episode. Call start_episode() first.") + + thread = self._threads[video_key] + if not thread.is_alive(): + # Check for error + try: + status, msg = self._result_queues[video_key].get_nowait() + if status == "error": + raise RuntimeError(f"Encoder thread for {video_key} crashed: {msg}") + except queue.Empty: + pass + raise RuntimeError(f"Encoder thread for {video_key} is not alive") + + try: + self._frame_queues[video_key].put(image.copy(), timeout=0.1) + except queue.Full: + self._dropped_frames[video_key] = self._dropped_frames.get(video_key, 0) + 1 + count = self._dropped_frames[video_key] + # Log periodically to avoid spam (1st, then every 10th) + if count == 1 or count % 10 == 0: + logging.warning( + f"Encoder queue full for {video_key}, dropped {count} frame(s). " + f"Consider using vcodec='auto' for hardware encoding or increasing encoder_queue_maxsize." + ) + + def finish_episode(self) -> dict[str, tuple[Path, dict | None]]: + """Finish encoding the current episode. + + Sends sentinel values, waits for encoder threads to complete, + and collects results. + + Returns: + Dict mapping video_key to (mp4_path, stats_dict_or_None) + """ + if not self._episode_active: + raise RuntimeError("No active episode to finish.") + + results = {} + + # Report dropped frames + for video_key, count in self._dropped_frames.items(): + if count > 0: + logging.warning(f"Episode finished with {count} dropped frame(s) for {video_key}.") + + # Send sentinel to all queues + for video_key in self._frame_queues: + self._frame_queues[video_key].put(None) + + # Wait for all threads and collect results + for video_key in self._threads: + self._threads[video_key].join(timeout=120) + if self._threads[video_key].is_alive(): + logging.error(f"Encoder thread for {video_key} did not finish in time") + self._stop_events[video_key].set() + self._threads[video_key].join(timeout=5) + results[video_key] = (self._video_paths[video_key], None) + continue + + try: + status, data = self._result_queues[video_key].get(timeout=5) + if status == "error": + raise RuntimeError(f"Encoder thread for {video_key} failed: {data}") + results[video_key] = (self._video_paths[video_key], data) + except queue.Empty: + logging.error(f"No result from encoder thread for {video_key}") + results[video_key] = (self._video_paths[video_key], None) + + self._cleanup() + self._episode_active = False + return results + + def cancel_episode(self) -> None: + """Cancel the current episode, stopping encoder threads and cleaning up.""" + if not self._episode_active: + return + + # Signal all threads to stop + for video_key in self._stop_events: + self._stop_events[video_key].set() + + # Wait for threads to finish + for video_key in self._threads: + self._threads[video_key].join(timeout=5) + + # Clean up temp MP4 files + video_path = self._video_paths.get(video_key) + if video_path is not None and video_path.exists(): + shutil.rmtree(str(video_path.parent), ignore_errors=True) + + self._cleanup() + self._episode_active = False + + def close(self) -> None: + """Close the encoder, canceling any in-progress episode.""" + if self._episode_active: + self.cancel_episode() + + def _cleanup(self) -> None: + """Clean up queues and thread tracking dicts.""" + for q in self._frame_queues.values(): + with contextlib.suppress(Exception): + while not q.empty(): + q.get_nowait() + self._frame_queues.clear() + self._result_queues.clear() + self._threads.clear() + self._stop_events.clear() + self._video_paths.clear() + + @dataclass class VideoFrame: # TODO(rcadene, lhoestq): move to Hugging Face `datasets` repo @@ -514,7 +937,7 @@ with warnings.catch_warnings(): def get_audio_info(video_path: Path | str) -> dict: # Set logging level - logging.getLogger("libav").setLevel(av.logging.ERROR) + logging.getLogger("libav").setLevel(av.logging.WARNING) # Getting audio stream information audio_info = {} @@ -546,7 +969,7 @@ def get_audio_info(video_path: Path | str) -> dict: def get_video_info(video_path: Path | str) -> dict: # Set logging level - logging.getLogger("libav").setLevel(av.logging.ERROR) + logging.getLogger("libav").setLevel(av.logging.WARNING) # Getting video stream information video_info = {} @@ -632,8 +1055,15 @@ class VideoEncodingManager: return self def __exit__(self, exc_type, exc_val, exc_tb): - # Handle any remaining episodes that haven't been batch encoded - if self.dataset.episodes_since_last_encoding > 0: + streaming_encoder = getattr(self.dataset, "_streaming_encoder", None) + + if streaming_encoder is not None: + # Handle streaming encoder cleanup + if exc_type is not None: + streaming_encoder.cancel_episode() + streaming_encoder.close() + elif self.dataset.episodes_since_last_encoding > 0: + # Handle any remaining episodes that haven't been batch encoded if exc_type is not None: logging.info("Exception occurred. Encoding remaining episodes before exit...") else: @@ -650,8 +1080,8 @@ class VideoEncodingManager: # Finalize the dataset to properly close all writers self.dataset.finalize() - # Clean up episode images if recording was interrupted - if exc_type is not None: + # Clean up episode images if recording was interrupted (only for non-streaming mode) + if exc_type is not None and streaming_encoder is None: interrupted_episode_index = self.dataset.num_episodes for key in self.dataset.meta.video_keys: img_dir = self.dataset._get_image_file_path( @@ -665,14 +1095,12 @@ class VideoEncodingManager: # Clean up any remaining images directory if it's empty img_dir = self.dataset.root / "images" - # Check for any remaining PNG files - png_files = list(img_dir.rglob("*.png")) - if len(png_files) == 0: - # Only remove the images directory if no PNG files remain - if img_dir.exists(): + if img_dir.exists(): + png_files = list(img_dir.rglob("*.png")) + if len(png_files) == 0: shutil.rmtree(img_dir) logging.debug("Cleaned up empty images directory") - else: - logging.debug(f"Images directory is not empty, containing {len(png_files)} PNG files") + else: + logging.debug(f"Images directory is not empty, containing {len(png_files)} PNG files") return False # Don't suppress the original exception diff --git a/src/lerobot/scripts/lerobot_record.py b/src/lerobot/scripts/lerobot_record.py index 216ab22a6..ec04975d4 100644 --- a/src/lerobot/scripts/lerobot_record.py +++ b/src/lerobot/scripts/lerobot_record.py @@ -26,8 +26,10 @@ lerobot-record \ --dataset.repo_id=/ \ --dataset.num_episodes=2 \ --dataset.single_task="Grab the cube" \ + --dataset.streaming_encoding=true \ + --dataset.encoder_threads=2 \ --display_data=true - # <- Optional: specify video codec (h264, hevc, libsvtav1). Default is libsvtav1. \ + # <- Optional: specify video codec (auto, h264, hevc, libsvtav1). Default is libsvtav1. \ # --dataset.vcodec=h264 \ # <- Teleop optional if you want to teleoperate to record or in between episodes with a policy \ # --teleop.type=so100_leader \ @@ -58,7 +60,10 @@ lerobot-record \ --display_data=true \ --dataset.repo_id=${HF_USER}/bimanual-so-handover-cube \ --dataset.num_episodes=25 \ - --dataset.single_task="Grab and handover the red cube to the other arm" + --dataset.single_task="Grab and handover the red cube to the other arm" \ + --dataset.streaming_encoding=true \ + # --dataset.vcodec=auto \ + --dataset.encoder_threads=2 ``` """ @@ -179,9 +184,19 @@ class DatasetRecordConfig: # Number of episodes to record before batch encoding videos # Set to 1 for immediate encoding (default behavior), or higher for batched encoding video_encoding_batch_size: int = 1 - # Video codec for encoding videos. Options: 'h264', 'hevc', 'libsvtav1'. - # Use 'h264' for faster encoding on systems where AV1 encoding is CPU-heavy. + # Video codec for encoding videos. Options: 'h264', 'hevc', 'libsvtav1', 'auto', + # or hardware-specific: 'h264_videotoolbox', 'h264_nvenc', 'h264_vaapi', 'h264_qsv'. + # Use 'auto' to auto-detect the best available hardware encoder. vcodec: str = "libsvtav1" + # Enable streaming video encoding: encode frames in real-time during capture instead + # of writing PNG images first. Makes save_episode() near-instant. More info in the documentation: https://huggingface.co/docs/lerobot/streaming_video_encoding + streaming_encoding: bool = False + # Maximum number of frames to buffer per camera when using streaming encoding. + # ~1s buffer at 30fps. Provides backpressure if the encoder can't keep up. + encoder_queue_maxsize: int = 30 + # Number of threads per encoder instance. None = auto (codec default). + # Lower values reduce CPU usage, maps to 'lp' (via svtav1-params) for libsvtav1 and 'threads' for h264/hevc.. + encoder_threads: int | None = None # Rename map for the observation to override the image and state keys rename_map: dict[str, str] = field(default_factory=dict) @@ -452,6 +467,9 @@ def record(cfg: RecordConfig) -> LeRobotDataset: root=cfg.dataset.root, batch_encoding_size=cfg.dataset.video_encoding_batch_size, vcodec=cfg.dataset.vcodec, + streaming_encoding=cfg.dataset.streaming_encoding, + encoder_queue_maxsize=cfg.dataset.encoder_queue_maxsize, + encoder_threads=cfg.dataset.encoder_threads, ) if hasattr(robot, "cameras") and len(robot.cameras) > 0: @@ -474,6 +492,9 @@ def record(cfg: RecordConfig) -> LeRobotDataset: image_writer_threads=cfg.dataset.num_image_writer_threads_per_camera * len(robot.cameras), batch_encoding_size=cfg.dataset.video_encoding_batch_size, vcodec=cfg.dataset.vcodec, + streaming_encoding=cfg.dataset.streaming_encoding, + encoder_queue_maxsize=cfg.dataset.encoder_queue_maxsize, + encoder_threads=cfg.dataset.encoder_threads, ) # Load pretrained policy @@ -497,6 +518,11 @@ def record(cfg: RecordConfig) -> LeRobotDataset: listener, events = init_keyboard_listener() + if not cfg.dataset.streaming_encoding: + logging.info( + "Streaming encoding is disabled. If you have capable hardware, consider enabling it for way faster episode saving. --dataset.streaming_encoding=true --dataset.encoder_threads=2 # --dataset.vcodec=auto. More info in the documentation: https://huggingface.co/docs/lerobot/streaming_video_encoding" + ) + with VideoEncodingManager(dataset): recorded_episodes = 0 while recorded_episodes < cfg.dataset.num_episodes and not events["stop_recording"]: diff --git a/tests/datasets/test_datasets.py b/tests/datasets/test_datasets.py index 27c51b3c4..6f99eb301 100644 --- a/tests/datasets/test_datasets.py +++ b/tests/datasets/test_datasets.py @@ -31,7 +31,6 @@ from lerobot.configs.train import TrainPipelineConfig from lerobot.datasets.factory import make_dataset from lerobot.datasets.image_writer import image_array_to_pil_image from lerobot.datasets.lerobot_dataset import ( - VALID_VIDEO_CODECS, LeRobotDataset, MultiLeRobotDataset, _encode_video_worker, @@ -45,6 +44,7 @@ from lerobot.datasets.utils import ( hf_transform_to_torch, hw_to_dataset_features, ) +from lerobot.datasets.video_utils import VALID_VIDEO_CODECS from lerobot.envs.factory import make_env_config from lerobot.policies.factory import make_policy_config from lerobot.robots import make_robot_from_config @@ -393,7 +393,7 @@ def test_tmp_mixed_deletion(tmp_path, empty_lerobot_dataset_factory): vid_key: {"dtype": "video", "shape": DUMMY_HWC, "names": ["height", "width", "channels"]}, } ds_mixed = empty_lerobot_dataset_factory( - root=tmp_path / "mixed", features=features_mixed, batch_encoding_size=2 + root=tmp_path / "mixed", features=features_mixed, batch_encoding_size=2, streaming_encoding=False ) ds_mixed.add_frame( { @@ -1450,7 +1450,10 @@ def test_valid_video_codecs_constant(): assert "h264" in VALID_VIDEO_CODECS assert "hevc" in VALID_VIDEO_CODECS assert "libsvtav1" in VALID_VIDEO_CODECS - assert len(VALID_VIDEO_CODECS) == 3 + assert "auto" in VALID_VIDEO_CODECS + assert "h264_videotoolbox" in VALID_VIDEO_CODECS + assert "h264_nvenc" in VALID_VIDEO_CODECS + assert len(VALID_VIDEO_CODECS) == 10 def test_delta_timestamps_with_episodes_filter(tmp_path, empty_lerobot_dataset_factory): diff --git a/tests/datasets/test_streaming_video_encoder.py b/tests/datasets/test_streaming_video_encoder.py new file mode 100644 index 000000000..a85db6a8d --- /dev/null +++ b/tests/datasets/test_streaming_video_encoder.py @@ -0,0 +1,730 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for streaming video encoding and hardware-accelerated encoding.""" + +import queue +import threading +from unittest.mock import patch + +import av +import numpy as np +import pytest + +from lerobot.datasets.video_utils import ( + VALID_VIDEO_CODECS, + StreamingVideoEncoder, + _CameraEncoderThread, + _get_codec_options, + detect_available_hw_encoders, + resolve_vcodec, +) +from lerobot.utils.constants import OBS_IMAGES + +# ─── _get_codec_options tests ─── + + +class TestGetCodecOptions: + def test_libsvtav1_defaults(self): + opts = _get_codec_options("libsvtav1") + assert opts["g"] == "2" + assert opts["crf"] == "30" + assert opts["preset"] == "12" + + def test_libsvtav1_custom_preset(self): + opts = _get_codec_options("libsvtav1", preset=8) + assert opts["preset"] == "8" + + def test_h264_options(self): + opts = _get_codec_options("h264", g=10, crf=23) + assert opts["g"] == "10" + assert opts["crf"] == "23" + assert "preset" not in opts + + def test_videotoolbox_options(self): + opts = _get_codec_options("h264_videotoolbox", g=2, crf=30) + assert opts["g"] == "2" + # CRF 30 maps to quality = max(1, min(100, 100 - 30*2)) = 40 + assert opts["q:v"] == "40" + assert "crf" not in opts + + def test_nvenc_options(self): + opts = _get_codec_options("h264_nvenc", g=2, crf=25) + assert opts["rc"] == "constqp" + assert opts["qp"] == "25" + assert "crf" not in opts + # NVENC doesn't support g + assert "g" not in opts + + def test_vaapi_options(self): + opts = _get_codec_options("h264_vaapi", crf=28) + assert opts["qp"] == "28" + + def test_qsv_options(self): + opts = _get_codec_options("h264_qsv", crf=25) + assert opts["global_quality"] == "25" + + def test_no_g_no_crf(self): + opts = _get_codec_options("h264", g=None, crf=None) + assert "g" not in opts + assert "crf" not in opts + + +# ─── HW encoder detection tests ─── + + +class TestHWEncoderDetection: + def test_detect_available_hw_encoders_returns_list(self): + result = detect_available_hw_encoders() + assert isinstance(result, list) + + def test_detect_available_hw_encoders_only_valid(self): + from lerobot.datasets.video_utils import HW_ENCODERS + + result = detect_available_hw_encoders() + for encoder in result: + assert encoder in HW_ENCODERS + + def test_resolve_vcodec_passthrough(self): + assert resolve_vcodec("libsvtav1") == "libsvtav1" + assert resolve_vcodec("h264") == "h264" + + def test_resolve_vcodec_auto_fallback(self): + """When no HW encoders are available, auto should fall back to libsvtav1.""" + with patch("lerobot.datasets.video_utils.detect_available_hw_encoders", return_value=[]): + assert resolve_vcodec("auto") == "libsvtav1" + + def test_resolve_vcodec_auto_picks_hw(self): + """When a HW encoder is available, auto should pick it.""" + with patch( + "lerobot.datasets.video_utils.detect_available_hw_encoders", + return_value=["h264_videotoolbox"], + ): + assert resolve_vcodec("auto") == "h264_videotoolbox" + + def test_resolve_vcodec_auto_returns_valid(self): + """Test that resolve_vcodec('auto') returns a known valid codec.""" + result = resolve_vcodec("auto") + assert result in VALID_VIDEO_CODECS + + def test_hw_encoder_names_accepted_in_validation(self): + """Test that HW encoder names pass validation in VALID_VIDEO_CODECS.""" + assert "auto" in VALID_VIDEO_CODECS + assert "h264_videotoolbox" in VALID_VIDEO_CODECS + assert "h264_nvenc" in VALID_VIDEO_CODECS + + def test_resolve_vcodec_invalid_raises(self): + """Test that resolve_vcodec raises ValueError for invalid codecs.""" + with pytest.raises(ValueError, match="Invalid vcodec"): + resolve_vcodec("not_a_real_codec") + + +# ─── _CameraEncoderThread tests ─── + + +class TestCameraEncoderThread: + def test_encodes_valid_mp4(self, tmp_path): + """Test that the encoder thread creates a valid MP4 file with correct frame count.""" + num_frames = 30 + height, width = 64, 96 + fps = 30 + video_path = tmp_path / "test_output" / "test.mp4" + + frame_queue: queue.Queue = queue.Queue(maxsize=60) + result_queue: queue.Queue = queue.Queue(maxsize=1) + stop_event = threading.Event() + + encoder_thread = _CameraEncoderThread( + video_path=video_path, + fps=fps, + vcodec="libsvtav1", + pix_fmt="yuv420p", + g=2, + crf=30, + preset=13, + frame_queue=frame_queue, + result_queue=result_queue, + stop_event=stop_event, + ) + encoder_thread.start() + + # Feed frames (HWC uint8) + for _ in range(num_frames): + frame = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8) + frame_queue.put(frame) + + # Send sentinel + frame_queue.put(None) + encoder_thread.join(timeout=60) + assert not encoder_thread.is_alive() + + # Check result + status, data = result_queue.get(timeout=5) + assert status == "ok" + assert data is not None # Stats should be returned + assert "mean" in data + assert "std" in data + assert "min" in data + assert "max" in data + assert "count" in data + + # Verify the MP4 file is valid + assert video_path.exists() + with av.open(str(video_path)) as container: + stream = container.streams.video[0] + # The frame count should match + total_frames = sum(1 for _ in container.decode(stream)) + assert total_frames == num_frames + + def test_handles_chw_input(self, tmp_path): + """Test that CHW format input is handled correctly.""" + num_frames = 5 + fps = 30 + video_path = tmp_path / "test_chw" / "test.mp4" + + frame_queue: queue.Queue = queue.Queue(maxsize=60) + result_queue: queue.Queue = queue.Queue(maxsize=1) + stop_event = threading.Event() + + encoder_thread = _CameraEncoderThread( + video_path=video_path, + fps=fps, + vcodec="libsvtav1", + pix_fmt="yuv420p", + g=2, + crf=30, + preset=13, + frame_queue=frame_queue, + result_queue=result_queue, + stop_event=stop_event, + ) + encoder_thread.start() + + # Feed CHW frames + for _ in range(num_frames): + frame = np.random.randint(0, 255, (3, 64, 96), dtype=np.uint8) + frame_queue.put(frame) + + frame_queue.put(None) + encoder_thread.join(timeout=60) + + status, _ = result_queue.get(timeout=5) + assert status == "ok" + assert video_path.exists() + + def test_stop_event_cancellation(self, tmp_path): + """Test that setting the stop event causes the thread to exit.""" + fps = 30 + video_path = tmp_path / "test_cancel" / "test.mp4" + + frame_queue: queue.Queue = queue.Queue(maxsize=60) + result_queue: queue.Queue = queue.Queue(maxsize=1) + stop_event = threading.Event() + + encoder_thread = _CameraEncoderThread( + video_path=video_path, + fps=fps, + vcodec="libsvtav1", + pix_fmt="yuv420p", + g=2, + crf=30, + preset=13, + frame_queue=frame_queue, + result_queue=result_queue, + stop_event=stop_event, + ) + encoder_thread.start() + + # Feed a few frames + for _ in range(3): + frame = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8) + frame_queue.put(frame) + + # Signal stop instead of sending sentinel + stop_event.set() + encoder_thread.join(timeout=10) + assert not encoder_thread.is_alive() + + +# ─── StreamingVideoEncoder tests ─── + + +class TestStreamingVideoEncoder: + def test_single_camera_episode(self, tmp_path): + """Test encoding a single camera episode.""" + encoder = StreamingVideoEncoder(fps=30, vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30, preset=13) + + video_keys = [f"{OBS_IMAGES}.laptop"] + encoder.start_episode(video_keys, tmp_path) + + num_frames = 20 + for _ in range(num_frames): + frame = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8) + encoder.feed_frame(f"{OBS_IMAGES}.laptop", frame) + + results = encoder.finish_episode() + assert f"{OBS_IMAGES}.laptop" in results + + mp4_path, stats = results[f"{OBS_IMAGES}.laptop"] + assert mp4_path.exists() + assert stats is not None + + # Verify frame count + with av.open(str(mp4_path)) as container: + stream = container.streams.video[0] + total_frames = sum(1 for _ in container.decode(stream)) + assert total_frames == num_frames + + encoder.close() + + def test_multi_camera_episode(self, tmp_path): + """Test encoding multiple cameras simultaneously.""" + encoder = StreamingVideoEncoder(fps=30, vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30) + + video_keys = [f"{OBS_IMAGES}.laptop", f"{OBS_IMAGES}.phone"] + encoder.start_episode(video_keys, tmp_path) + + num_frames = 15 + for _ in range(num_frames): + frame0 = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8) + frame1 = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8) + encoder.feed_frame(video_keys[0], frame0) + encoder.feed_frame(video_keys[1], frame1) + + results = encoder.finish_episode() + + for key in video_keys: + assert key in results + mp4_path, stats = results[key] + assert mp4_path.exists() + assert stats is not None + + encoder.close() + + def test_sequential_episodes(self, tmp_path): + """Test that multiple sequential episodes work correctly.""" + encoder = StreamingVideoEncoder(fps=30, vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30) + video_keys = [f"{OBS_IMAGES}.cam"] + + for ep in range(3): + encoder.start_episode(video_keys, tmp_path) + num_frames = 10 + ep * 5 + for _ in range(num_frames): + frame = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8) + encoder.feed_frame(f"{OBS_IMAGES}.cam", frame) + results = encoder.finish_episode() + + mp4_path, stats = results[f"{OBS_IMAGES}.cam"] + assert mp4_path.exists() + + with av.open(str(mp4_path)) as container: + stream = container.streams.video[0] + total_frames = sum(1 for _ in container.decode(stream)) + assert total_frames == num_frames + + encoder.close() + + def test_cancel_episode(self, tmp_path): + """Test that canceling an episode cleans up properly.""" + encoder = StreamingVideoEncoder(fps=30, vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30) + video_keys = [f"{OBS_IMAGES}.cam"] + + encoder.start_episode(video_keys, tmp_path) + + for _ in range(5): + frame = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8) + encoder.feed_frame(f"{OBS_IMAGES}.cam", frame) + + encoder.cancel_episode() + + # Should be able to start a new episode after cancel + encoder.start_episode(video_keys, tmp_path) + for _ in range(5): + frame = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8) + encoder.feed_frame(f"{OBS_IMAGES}.cam", frame) + results = encoder.finish_episode() + + assert f"{OBS_IMAGES}.cam" in results + encoder.close() + + def test_feed_without_start_raises(self, tmp_path): + """Test that feeding frames without starting an episode raises.""" + encoder = StreamingVideoEncoder(fps=30, vcodec="libsvtav1", pix_fmt="yuv420p") + with pytest.raises(RuntimeError, match="No active episode"): + encoder.feed_frame("cam", np.zeros((64, 96, 3), dtype=np.uint8)) + encoder.close() + + def test_finish_without_start_raises(self, tmp_path): + """Test that finishing without starting raises.""" + encoder = StreamingVideoEncoder(fps=30, vcodec="libsvtav1", pix_fmt="yuv420p") + with pytest.raises(RuntimeError, match="No active episode"): + encoder.finish_episode() + encoder.close() + + def test_close_is_idempotent(self, tmp_path): + """Test that close() can be called multiple times safely.""" + encoder = StreamingVideoEncoder(fps=30, vcodec="libsvtav1", pix_fmt="yuv420p") + encoder.close() + encoder.close() # Should not raise + + def test_video_duration_matches_frame_count(self, tmp_path): + """Test that encoded video duration matches num_frames / fps.""" + encoder = StreamingVideoEncoder(fps=30, vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30, preset=13) + video_keys = [f"{OBS_IMAGES}.cam"] + encoder.start_episode(video_keys, tmp_path) + + num_frames = 90 # 3 seconds at 30fps + for _ in range(num_frames): + frame = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8) + encoder.feed_frame(f"{OBS_IMAGES}.cam", frame) + + results = encoder.finish_episode() + mp4_path, _ = results[f"{OBS_IMAGES}.cam"] + + expected_duration = num_frames / 30.0 # 3.0 seconds + + with av.open(str(mp4_path)) as container: + stream = container.streams.video[0] + total_frames = sum(1 for _ in container.decode(stream)) + if stream.duration is not None: + actual_duration = float(stream.duration * stream.time_base) + else: + actual_duration = float(container.duration / av.time_base) + + assert total_frames == num_frames + # Allow small tolerance for duration due to codec framing + assert abs(actual_duration - expected_duration) < 0.5, ( + f"Video duration {actual_duration:.2f}s != expected {expected_duration:.2f}s" + ) + + encoder.close() + + def test_multi_camera_start_episode_called_once(self, tmp_path): + """Test that with multiple cameras, no frames are lost due to double start_episode.""" + encoder = StreamingVideoEncoder(fps=30, vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30) + + video_keys = [f"{OBS_IMAGES}.cam1", f"{OBS_IMAGES}.cam2"] + encoder.start_episode(video_keys, tmp_path) + + num_frames = 30 + for _ in range(num_frames): + frame0 = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8) + frame1 = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8) + encoder.feed_frame(video_keys[0], frame0) + encoder.feed_frame(video_keys[1], frame1) + + results = encoder.finish_episode() + + # Both cameras should have all frames + for key in video_keys: + mp4_path, stats = results[key] + assert mp4_path.exists() + with av.open(str(mp4_path)) as container: + stream = container.streams.video[0] + total_frames = sum(1 for _ in container.decode(stream)) + assert total_frames == num_frames, ( + f"Camera {key}: expected {num_frames} frames, got {total_frames}" + ) + + encoder.close() + + def test_encoder_threads_passed_to_thread(self, tmp_path): + """Test that encoder_threads is stored and passed through to encoder threads.""" + encoder = StreamingVideoEncoder( + fps=30, vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30, encoder_threads=2 + ) + assert encoder.encoder_threads == 2 + + video_keys = [f"{OBS_IMAGES}.cam"] + encoder.start_episode(video_keys, tmp_path) + + # Verify the thread received the encoder_threads value + thread = encoder._threads[f"{OBS_IMAGES}.cam"] + assert thread.encoder_threads == 2 + + # Feed some frames and finish to ensure it works end-to-end + num_frames = 10 + for _ in range(num_frames): + frame = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8) + encoder.feed_frame(f"{OBS_IMAGES}.cam", frame) + + results = encoder.finish_episode() + mp4_path, stats = results[f"{OBS_IMAGES}.cam"] + assert mp4_path.exists() + assert stats is not None + + with av.open(str(mp4_path)) as container: + stream = container.streams.video[0] + total_frames = sum(1 for _ in container.decode(stream)) + assert total_frames == num_frames + + encoder.close() + + def test_encoder_threads_none_by_default(self, tmp_path): + """Test that encoder_threads defaults to None (codec auto-detect).""" + encoder = StreamingVideoEncoder(fps=30, vcodec="libsvtav1", pix_fmt="yuv420p") + assert encoder.encoder_threads is None + encoder.close() + + def test_graceful_frame_dropping(self, tmp_path): + """Test that full queue drops frames instead of crashing.""" + encoder = StreamingVideoEncoder( + fps=30, vcodec="libsvtav1", pix_fmt="yuv420p", g=2, crf=30, preset=13, queue_maxsize=1 + ) + video_keys = [f"{OBS_IMAGES}.cam"] + encoder.start_episode(video_keys, tmp_path) + + # Feed many frames quickly - with queue_maxsize=1, some will be dropped + num_frames = 50 + for _ in range(num_frames): + frame = np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8) + encoder.feed_frame(f"{OBS_IMAGES}.cam", frame) + + # Should not raise - frames are dropped gracefully + results = encoder.finish_episode() + assert f"{OBS_IMAGES}.cam" in results + + mp4_path, _ = results[f"{OBS_IMAGES}.cam"] + assert mp4_path.exists() + + # Some frames should have been dropped (queue was tiny) + dropped = encoder._dropped_frames.get(f"{OBS_IMAGES}.cam", 0) + # We can't guarantee drops but can verify no crash occurred + assert dropped >= 0 + + encoder.close() + + +# ─── Integration tests with LeRobotDataset ─── + + +class TestStreamingEncoderIntegration: + def test_add_frame_save_episode_streaming(self, tmp_path): + """Full integration test: add_frame -> save_episode with streaming encoding.""" + from lerobot.datasets.lerobot_dataset import LeRobotDataset + + features = { + "observation.images.cam": { + "dtype": "video", + "shape": (64, 96, 3), + "names": ["height", "width", "channels"], + }, + "action": {"dtype": "float32", "shape": (6,), "names": ["j1", "j2", "j3", "j4", "j5", "j6"]}, + } + + dataset = LeRobotDataset.create( + repo_id="test/streaming", + fps=30, + features=features, + root=tmp_path / "streaming_test", + use_videos=True, + streaming_encoding=True, + ) + + assert dataset._streaming_encoder is not None + + num_frames = 20 + for _ in range(num_frames): + frame = { + "observation.images.cam": np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8), + "action": np.random.randn(6).astype(np.float32), + "task": "test task", + } + dataset.add_frame(frame) + + dataset.save_episode() + + # Verify dataset metadata + assert dataset.meta.total_episodes == 1 + assert dataset.meta.total_frames == num_frames + + # Verify stats exist for the video key + assert dataset.meta.stats is not None + assert "observation.images.cam" in dataset.meta.stats + assert "action" in dataset.meta.stats + + dataset.finalize() + + def test_streaming_disabled_creates_pngs(self, tmp_path): + """Test that disabling streaming encoding falls back to PNG path.""" + from lerobot.datasets.lerobot_dataset import LeRobotDataset + + features = { + "observation.images.cam": { + "dtype": "video", + "shape": (64, 96, 3), + "names": ["height", "width", "channels"], + }, + "action": {"dtype": "float32", "shape": (6,), "names": ["j1", "j2", "j3", "j4", "j5", "j6"]}, + } + + dataset = LeRobotDataset.create( + repo_id="test/no_streaming", + fps=30, + features=features, + root=tmp_path / "no_streaming_test", + use_videos=True, + streaming_encoding=False, + ) + + assert dataset._streaming_encoder is None + + num_frames = 5 + for _ in range(num_frames): + frame = { + "observation.images.cam": np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8), + "action": np.random.randn(6).astype(np.float32), + "task": "test task", + } + dataset.add_frame(frame) + + # With streaming disabled, PNG files should be written + images_dir = dataset.root / "images" + assert images_dir.exists() + + dataset.save_episode() + dataset.finalize() + + def test_multi_episode_streaming(self, tmp_path): + """Test recording multiple episodes with streaming encoding.""" + from lerobot.datasets.lerobot_dataset import LeRobotDataset + + features = { + "observation.images.cam": { + "dtype": "video", + "shape": (64, 96, 3), + "names": ["height", "width", "channels"], + }, + "action": {"dtype": "float32", "shape": (2,), "names": ["j1", "j2"]}, + } + + dataset = LeRobotDataset.create( + repo_id="test/multi_ep", + fps=30, + features=features, + root=tmp_path / "multi_ep_test", + use_videos=True, + streaming_encoding=True, + ) + + for ep in range(3): + num_frames = 10 + ep * 5 + for _ in range(num_frames): + frame = { + "observation.images.cam": np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8), + "action": np.random.randn(2).astype(np.float32), + "task": f"task_{ep}", + } + dataset.add_frame(frame) + dataset.save_episode() + + assert dataset.meta.total_episodes == 3 + assert dataset.meta.total_frames == 10 + 15 + 20 + + dataset.finalize() + + def test_clear_episode_buffer_cancels_streaming(self, tmp_path): + """Test that clearing episode buffer cancels streaming encoding.""" + from lerobot.datasets.lerobot_dataset import LeRobotDataset + + features = { + "observation.images.cam": { + "dtype": "video", + "shape": (64, 96, 3), + "names": ["height", "width", "channels"], + }, + "action": {"dtype": "float32", "shape": (2,), "names": ["j1", "j2"]}, + } + + dataset = LeRobotDataset.create( + repo_id="test/cancel", + fps=30, + features=features, + root=tmp_path / "cancel_test", + use_videos=True, + streaming_encoding=True, + ) + + # Add some frames + for _ in range(5): + frame = { + "observation.images.cam": np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8), + "action": np.random.randn(2).astype(np.float32), + "task": "task", + } + dataset.add_frame(frame) + + # Cancel and re-record + dataset.clear_episode_buffer() + + # Record a new episode + for _ in range(10): + frame = { + "observation.images.cam": np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8), + "action": np.random.randn(2).astype(np.float32), + "task": "task", + } + dataset.add_frame(frame) + dataset.save_episode() + + assert dataset.meta.total_episodes == 1 + assert dataset.meta.total_frames == 10 + + dataset.finalize() + + def test_multi_camera_add_frame_streaming(self, tmp_path): + """Test that start_episode is called once with multiple video keys.""" + from lerobot.datasets.lerobot_dataset import LeRobotDataset + + features = { + "observation.images.cam1": { + "dtype": "video", + "shape": (64, 96, 3), + "names": ["height", "width", "channels"], + }, + "observation.images.cam2": { + "dtype": "video", + "shape": (64, 96, 3), + "names": ["height", "width", "channels"], + }, + "action": {"dtype": "float32", "shape": (2,), "names": ["j1", "j2"]}, + } + + dataset = LeRobotDataset.create( + repo_id="test/multi_cam", + fps=30, + features=features, + root=tmp_path / "multi_cam_test", + use_videos=True, + streaming_encoding=True, + ) + + num_frames = 15 + for _ in range(num_frames): + frame = { + "observation.images.cam1": np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8), + "observation.images.cam2": np.random.randint(0, 255, (64, 96, 3), dtype=np.uint8), + "action": np.random.randn(2).astype(np.float32), + "task": "test task", + } + dataset.add_frame(frame) + + dataset.save_episode() + + assert dataset.meta.total_episodes == 1 + assert dataset.meta.total_frames == num_frames + + dataset.finalize()