diff --git a/src/lerobot/datasets/dataset_tools.py b/src/lerobot/datasets/dataset_tools.py index 63fa23f5c..adbb841c4 100644 --- a/src/lerobot/datasets/dataset_tools.py +++ b/src/lerobot/datasets/dataset_tools.py @@ -1920,21 +1920,31 @@ def reencode_dataset( :func:`reencode_video`. ``None`` lets the codec decide. num_workers: Number of parallel processes. ``None`` or ``0`` means sequential (no multiprocessing); ``1+`` spawns a - :class:`~multiprocessing.pool.Pool`. + :class:`~concurrent.futures.ProcessPoolExecutor`. Returns: The same :class:`LeRobotDataset` instance with its metadata updated on disk. """ meta = dataset.meta - video_paths_list = sorted((meta.root / VIDEO_DIR).rglob("*.mp4")) + video_paths_list = [] + + # Only re-encode if the videos are not already encoded with the given video encoding parameters + for video_key in meta.video_keys: + current_info = meta.info.features[video_key].get("info", {}) + current_encoder = VideoEncoderConfig.from_video_info(current_info) + if current_encoder != camera_encoder: + video_paths_list.extend((meta.root / VIDEO_DIR / video_key).rglob("*.mp4")) + else: + logging.info(f"{video_key} videos are already encoded with {camera_encoder}. Nothing to do.") + if len(video_paths_list) == 0: logging.warning("Dataset has no videos to re-encode.") return dataset logging.info(f"Re-encoding {len(video_paths_list)} video file(s) with {camera_encoder}") worker_args = [(vp, camera_encoder, encoder_threads) for vp in video_paths_list] - if num_workers and num_workers >= 1: + if num_workers and num_workers > 1: with ProcessPoolExecutor(max_workers=num_workers) as pool: futures = [pool.submit(_reencode_video_worker, args) for args in worker_args] for future in tqdm( diff --git a/src/lerobot/datasets/video_utils.py b/src/lerobot/datasets/video_utils.py index 2be13dbac..99122381a 100644 --- a/src/lerobot/datasets/video_utils.py +++ b/src/lerobot/datasets/video_utils.py @@ -449,7 +449,9 @@ def reencode_video( except IndexError as e: raise ValueError(f"No video stream in {input_video_path}") from e - fps = int(in_stream.base_rate) + fps = ( + in_stream.base_rate + ) # We allow fractional fps though LeRobotDataset only supports integer fps width = int(in_stream.width) height = int(in_stream.height) @@ -474,6 +476,8 @@ def reencode_video( packet = out_stream.encode() if packet: dst.mux(packet) + + shutil.move(tmp_output_video_path, output_video_path) except Exception: Path(tmp_output_video_path).unlink(missing_ok=True) raise @@ -481,8 +485,6 @@ def reencode_video( if log_level is not None: av.logging.restore_default_callback() - shutil.move(tmp_output_video_path, output_video_path) - if not output_video_path.exists(): raise OSError(f"Video re-encoding did not work. File not found: {output_video_path}.") diff --git a/src/lerobot/scripts/lerobot_edit_dataset.py b/src/lerobot/scripts/lerobot_edit_dataset.py index 8b0dd5ebb..3c1edbb31 100644 --- a/src/lerobot/scripts/lerobot_edit_dataset.py +++ b/src/lerobot/scripts/lerobot_edit_dataset.py @@ -178,7 +178,7 @@ Recompute stats for relative actions and push to hub: --operation.num_workers 4 \ --push_to_hub true -Re-encode all videos in a dataset in-place with a new codec: +Re-encode all videos in a dataset (saves to lerobot/pusht_reencoded by default): lerobot-edit-dataset \ --repo_id lerobot/pusht \ --operation.type reencode_videos \ @@ -195,6 +195,14 @@ Re-encode videos into a new dataset using 4 parallel processes: --operation.camera_encoder.crf 23 \ --operation.num_workers 4 +Re-encode videos in-place (overwrites original dataset): + lerobot-edit-dataset \ + --repo_id lerobot/pusht \ + --new_repo_id lerobot/pusht \ + --operation.type reencode_videos \ + --operation.camera_encoder.vcodec h264 \ + --operation.overwrite true + Using JSON config file: lerobot-edit-dataset \ --config_path path/to/edit_config.json @@ -212,7 +220,6 @@ import draccus from lerobot.configs import VideoEncoderConfig, camera_encoder_defaults, parser from lerobot.datasets import ( LeRobotDataset, - LeRobotDatasetMetadata, convert_image_to_video_dataset, delete_episodes, merge_datasets, @@ -293,6 +300,7 @@ class ReencodeVideosConfig(OperationConfig): camera_encoder: VideoEncoderConfig = field(default_factory=camera_encoder_defaults) num_workers: int = 0 encoder_threads: int | None = None + overwrite: bool = False @OperationConfig.register_subclass("info") @@ -665,40 +673,40 @@ def handle_reencode_videos(cfg: EditDatasetConfig) -> None: if not isinstance(cfg.operation, ReencodeVideosConfig): raise ValueError("Operation config must be ReencodeVideosConfig") - meta = LeRobotDatasetMetadata(cfg.repo_id, root=cfg.root) - - first_video_key = meta.video_keys[0] if meta.video_keys else None - if first_video_key is not None: - current_info = meta.features[first_video_key].get("info", {}) - current_encoder = VideoEncoderConfig.from_video_info(current_info) - if current_encoder == cfg.operation.camera_encoder: - logging.info( - f"Videos in {cfg.repo_id} are already encoded with {current_encoder}. Nothing to do." - ) - return - else: - raise ValueError("Dataset has no video features — nothing to re-encode.") - - output_repo_id, input_path, output_path = _resolve_io_paths( - cfg.repo_id, cfg.new_repo_id, cfg.root, cfg.new_root + output_repo_id, input_root, output_root = _resolve_io_paths( + cfg.repo_id, + cfg.new_repo_id, + cfg.root, + cfg.new_root, + default_new_repo_id=f"{cfg.repo_id}_reencoded", ) + in_place = output_root == input_root - if output_path == input_path: - backup_path = input_path.with_name(input_path.name + "_old") - logging.info(f"In-place re-encode — backing up dataset to {backup_path}") - if backup_path.exists(): - shutil.rmtree(backup_path) - shutil.copytree(input_path, backup_path) + if in_place and not cfg.operation.overwrite: + raise ValueError( + f"reencode_videos would overwrite the dataset in-place at {input_root}. " + "Pass --operation.overwrite true to allow in-place modification, " + "or use --new_repo_id / --new_root to write to a different location. " + f"Default output repo_id when neither is set: '{cfg.repo_id}_reencoded'." + ) + + if in_place: + logging.warning( + f"Overwriting dataset videos in-place at {input_root}. The original videos will be lost." + ) + dataset = LeRobotDataset(cfg.repo_id, root=input_root) else: - logging.info(f"Copying dataset from {input_path} to {output_path}") - if output_path.exists(): - shutil.rmtree(output_path) - shutil.copytree(input_path, output_path) + logging.info(f"Copying dataset from {input_root} to {output_root}") + if output_root.exists(): + backup_path = output_root.with_name(output_root.name + "_old") + logging.warning(f"Output directory {output_root} already exists. Moving to {backup_path}") + if backup_path.exists(): + shutil.rmtree(backup_path) + shutil.move(output_root, backup_path) + shutil.copytree(input_root, output_root) + dataset = LeRobotDataset(output_repo_id, root=output_root) logging.info(f"Re-encoding videos in {output_repo_id} with {cfg.operation.camera_encoder}") - - dataset = LeRobotDataset(output_repo_id, root=output_path) - reencode_dataset( dataset, camera_encoder=cfg.operation.camera_encoder,