From f9fd0fb841d318b16635fbd1fb70171e04852861 Mon Sep 17 00:00:00 2001 From: CarolinePascal Date: Mon, 24 Nov 2025 21:51:33 +0100 Subject: [PATCH] feat(multi-processes): adding support for multiprocess encoding --- src/lerobot/datasets/lerobot_dataset.py | 29 +++++++++++++++++++++---- src/lerobot/datasets/video_utils.py | 2 +- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/src/lerobot/datasets/lerobot_dataset.py b/src/lerobot/datasets/lerobot_dataset.py index a6840891f..381c91b70 100644 --- a/src/lerobot/datasets/lerobot_dataset.py +++ b/src/lerobot/datasets/lerobot_dataset.py @@ -27,6 +27,7 @@ import pandas as pd import PIL.Image import pyarrow as pa import pyarrow.parquet as pq +from concurrent.futures import ProcessPoolExecutor import torch import torch.utils from huggingface_hub import HfApi, snapshot_download @@ -1149,8 +1150,9 @@ class LeRobotDataset(torch.utils.data.Dataset): use_batched_encoding = self.batch_encoding_size > 1 if has_video_keys and not use_batched_encoding: - for video_key in self.meta.video_keys: - ep_metadata.update(self._save_episode_video(video_key, episode_index)) + video_paths = self._encode_multiple_temporary_episode_videos(self.meta.video_keys, episode_index) + for (video_key, video_path) in zip(self.meta.video_keys, video_paths): + ep_metadata.update(self._save_episode_video(video_key, episode_index, video_path)) # `meta.save_episode` need to be executed after encoding the videos self.meta.save_episode(episode_index, episode_length, episode_tasks, ep_stats, ep_metadata) @@ -1315,9 +1317,12 @@ class LeRobotDataset(torch.utils.data.Dataset): return metadata - def _save_episode_video(self, video_key: str, episode_index: int) -> dict: + def _save_episode_video(self, video_key: str, episode_index: int, video_path: str | Path | None = None) -> dict: # Encode episode frames into a temporary video - ep_path = self._encode_temporary_episode_video(video_key, episode_index) + if video_path is None: + ep_path = self._encode_temporary_episode_video(video_key, episode_index) + else: + ep_path = video_path ep_size_in_mb = get_file_size_in_mb(ep_path) ep_duration_in_s = get_video_duration_in_s(ep_path) @@ -1441,6 +1446,22 @@ class LeRobotDataset(torch.utils.data.Dataset): shutil.rmtree(img_dir) return temp_path + def _encode_multiple_temporary_episode_videos(self, video_keys, episode_index): + temp_paths = [] + img_dirs = [] + for video_key in video_keys: + temp_paths.append(Path(tempfile.mkdtemp(dir=self.root)) / f"{video_key}_{episode_index:03d}.mp4") + img_dirs.append(self._get_image_file_dir(episode_index, video_key)) + fps = [self.fps]*len(video_keys) + + with ProcessPoolExecutor() as executor: + executor.map(encode_video_frames,img_dirs,temp_paths,fps) + + for img_dir in img_dirs: + shutil.rmtree(img_dir) + + return temp_paths + @classmethod def create( cls, diff --git a/src/lerobot/datasets/video_utils.py b/src/lerobot/datasets/video_utils.py index 04deb2bc1..f9e521c44 100644 --- a/src/lerobot/datasets/video_utils.py +++ b/src/lerobot/datasets/video_utils.py @@ -310,7 +310,7 @@ def encode_video_frames( crf: int | None = 30, fast_decode: int = 0, log_level: int | None = av.logging.ERROR, - overwrite: bool = False, + overwrite: bool = True, ) -> None: """More info on ffmpeg arguments tuning on `benchmark/video/README.md`""" # Check encoder availability