fix(benchmark) : fixing video benchmark (#2094)

* fix(time benchmark): removing deprecated TimeBenchmark dependency

* fix(typo): renaming frames in an up-to-date fashion

* feat(duets): rearanging crf and g parameters in a proper unique combination manner

* fix(segfault): fixing segfault by adding a lock in ThreadPoolExecutor

* chore(update) : update datasets, codecs and backends to the latest versions

* chore(unused files): removing unused files

* fix(dataset paths): fix datasets paths to live among lerobot datasets
This commit is contained in:
Caroline Pascal
2025-11-26 17:41:31 +01:00
committed by GitHub
parent 581dd45eae
commit 648ea8f485
3 changed files with 43 additions and 244 deletions
-94
View File
@@ -1,94 +0,0 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
import time
from contextlib import ContextDecorator
class TimeBenchmark(ContextDecorator):
"""
Measures execution time using a context manager or decorator.
This class supports both context manager and decorator usage, and is thread-safe for multithreaded
environments.
Args:
print: If True, prints the elapsed time upon exiting the context or completing the function. Defaults
to False.
Examples:
Using as a context manager:
>>> benchmark = TimeBenchmark()
>>> with benchmark:
... time.sleep(1)
>>> print(f"Block took {benchmark.result:.4f} seconds")
Block took approximately 1.0000 seconds
Using with multithreading:
```python
import threading
benchmark = TimeBenchmark()
def context_manager_example():
with benchmark:
time.sleep(0.01)
print(f"Block took {benchmark.result_ms:.2f} milliseconds")
threads = []
for _ in range(3):
t1 = threading.Thread(target=context_manager_example)
threads.append(t1)
for t in threads:
t.start()
for t in threads:
t.join()
```
Expected output:
Block took approximately 10.00 milliseconds
Block took approximately 10.00 milliseconds
Block took approximately 10.00 milliseconds
"""
def __init__(self, print=False):
self.local = threading.local()
self.print_time = print
def __enter__(self):
self.local.start_time = time.perf_counter()
return self
def __exit__(self, *exc):
self.local.end_time = time.perf_counter()
self.local.elapsed_time = self.local.end_time - self.local.start_time
if self.print_time:
print(f"Elapsed time: {self.local.elapsed_time:.4f} seconds")
return False
@property
def result(self):
return getattr(self.local, "elapsed_time", None)
@property
def result_ms(self):
return self.result * 1e3
-102
View File
@@ -1,102 +0,0 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Capture video feed from a camera as raw images."""
import argparse
import datetime as dt
import os
import time
from pathlib import Path
import cv2
import rerun as rr
# see https://rerun.io/docs/howto/visualization/limit-ram
RERUN_MEMORY_LIMIT = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "5%")
def display_and_save_video_stream(output_dir: Path, fps: int, width: int, height: int, duration: int):
rr.init("lerobot_capture_camera_feed")
rr.spawn(memory_limit=RERUN_MEMORY_LIMIT)
now = dt.datetime.now()
capture_dir = output_dir / f"{now:%Y-%m-%d}" / f"{now:%H-%M-%S}"
if not capture_dir.exists():
capture_dir.mkdir(parents=True, exist_ok=True)
# Opens the default webcam
cap = cv2.VideoCapture(0)
if not cap.isOpened():
print("Error: Could not open video stream.")
return
cap.set(cv2.CAP_PROP_FPS, fps)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
frame_index = 0
start_time = time.time()
while time.time() - start_time < duration:
ret, frame = cap.read()
if not ret:
print("Error: Could not read frame.")
break
rr.log("video/stream", rr.Image(frame), static=True)
cv2.imwrite(str(capture_dir / f"frame_{frame_index:06d}.png"), frame)
frame_index += 1
# Release the capture
cap.release()
# TODO(Steven): Add a graceful shutdown via a close() method for the Viewer context, though not currently supported in the Rerun API.
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--output-dir",
type=Path,
default=Path("outputs/cam_capture/"),
help="Directory where the capture images are written. A subfolder named with the current date & time will be created inside it for each capture.",
)
parser.add_argument(
"--fps",
type=int,
default=30,
help="Frames Per Second of the capture.",
)
parser.add_argument(
"--width",
type=int,
default=1280,
help="Width of the captured images.",
)
parser.add_argument(
"--height",
type=int,
default=720,
help="Height of the captured images.",
)
parser.add_argument(
"--duration",
type=int,
default=20,
help="Duration in seconds for which the video stream should be captured.",
)
args = parser.parse_args()
display_and_save_video_stream(**vars(args))
+43 -48
View File
@@ -21,11 +21,13 @@ See the provided README.md or run `python benchmark/video/run_video_benchmark.py
import argparse import argparse
import datetime as dt import datetime as dt
import itertools
import random import random
import shutil import shutil
from collections import OrderedDict from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path from pathlib import Path
from threading import Lock
import einops import einops
import numpy as np import numpy as np
@@ -35,13 +37,13 @@ import torch
from skimage.metrics import mean_squared_error, peak_signal_noise_ratio, structural_similarity from skimage.metrics import mean_squared_error, peak_signal_noise_ratio, structural_similarity
from tqdm import tqdm from tqdm import tqdm
from benchmarks.video.benchmark import TimeBenchmark
from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.video_utils import ( from lerobot.datasets.video_utils import (
decode_video_frames_torchvision, decode_video_frames,
encode_video_frames, encode_video_frames,
) )
from lerobot.utils.constants import OBS_IMAGE from lerobot.utils.constants import OBS_IMAGE
from lerobot.utils.utils import TimerManager
BASE_ENCODING = OrderedDict( BASE_ENCODING = OrderedDict(
[ [
@@ -86,7 +88,7 @@ def load_original_frames(imgs_dir: Path, timestamps: list[float], fps: int) -> t
frames = [] frames = []
for ts in timestamps: for ts in timestamps:
idx = int(ts * fps) idx = int(ts * fps)
frame = PIL.Image.open(imgs_dir / f"frame_{idx:06d}.png") frame = PIL.Image.open(imgs_dir / f"frame-{idx:06d}.png")
frame = torch.from_numpy(np.array(frame)) frame = torch.from_numpy(np.array(frame))
frame = frame.type(torch.float32) / 255 frame = frame.type(torch.float32) / 255
frame = einops.rearrange(frame, "h w c -> c h w") frame = einops.rearrange(frame, "h w c -> c h w")
@@ -97,21 +99,21 @@ def load_original_frames(imgs_dir: Path, timestamps: list[float], fps: int) -> t
def save_decoded_frames( def save_decoded_frames(
imgs_dir: Path, save_dir: Path, frames: torch.Tensor, timestamps: list[float], fps: int imgs_dir: Path, save_dir: Path, frames: torch.Tensor, timestamps: list[float], fps: int
) -> None: ) -> None:
if save_dir.exists() and len(list(save_dir.glob("frame_*.png"))) == len(timestamps): if save_dir.exists() and len(list(save_dir.glob("frame-*.png"))) == len(timestamps):
return return
save_dir.mkdir(parents=True, exist_ok=True) save_dir.mkdir(parents=True, exist_ok=True)
for i, ts in enumerate(timestamps): for i, ts in enumerate(timestamps):
idx = int(ts * fps) idx = int(ts * fps)
frame_hwc = (frames[i].permute((1, 2, 0)) * 255).type(torch.uint8).cpu().numpy() frame_hwc = (frames[i].permute((1, 2, 0)) * 255).type(torch.uint8).cpu().numpy()
PIL.Image.fromarray(frame_hwc).save(save_dir / f"frame_{idx:06d}_decoded.png") PIL.Image.fromarray(frame_hwc).save(save_dir / f"frame-{idx:06d}_decoded.png")
shutil.copyfile(imgs_dir / f"frame_{idx:06d}.png", save_dir / f"frame_{idx:06d}_original.png") shutil.copyfile(imgs_dir / f"frame-{idx:06d}.png", save_dir / f"frame-{idx:06d}_original.png")
def save_first_episode(imgs_dir: Path, dataset: LeRobotDataset) -> None: def save_first_episode(imgs_dir: Path, dataset: LeRobotDataset) -> None:
episode_index = 0 episode_index = 0
ep_num_images = dataset.meta.episodes["length"][episode_index] ep_num_images = dataset.meta.episodes["length"][episode_index]
if imgs_dir.exists() and len(list(imgs_dir.glob("frame_*.png"))) == ep_num_images: if imgs_dir.exists() and len(list(imgs_dir.glob("frame-*.png"))) == ep_num_images:
return return
imgs_dir.mkdir(parents=True, exist_ok=True) imgs_dir.mkdir(parents=True, exist_ok=True)
@@ -125,7 +127,7 @@ def save_first_episode(imgs_dir: Path, dataset: LeRobotDataset) -> None:
tqdm(imgs_dataset, desc=f"saving {dataset.repo_id} first episode images", leave=False) tqdm(imgs_dataset, desc=f"saving {dataset.repo_id} first episode images", leave=False)
): ):
img = item[img_keys[0]] img = item[img_keys[0]]
img.save(str(imgs_dir / f"frame_{i:06d}.png"), quality=100) img.save(str(imgs_dir / f"frame-{i:06d}.png"), quality=100)
if i >= ep_num_images - 1: if i >= ep_num_images - 1:
break break
@@ -149,18 +151,6 @@ def sample_timestamps(timestamps_mode: str, ep_num_images: int, fps: int) -> lis
return [idx / fps for idx in frame_indexes] return [idx / fps for idx in frame_indexes]
def decode_video_frames(
video_path: str,
timestamps: list[float],
tolerance_s: float,
backend: str,
) -> torch.Tensor:
if backend in ["pyav", "video_reader"]:
return decode_video_frames_torchvision(video_path, timestamps, tolerance_s, backend)
else:
raise NotImplementedError(backend)
def benchmark_decoding( def benchmark_decoding(
imgs_dir: Path, imgs_dir: Path,
video_path: Path, video_path: Path,
@@ -172,8 +162,8 @@ def benchmark_decoding(
num_workers: int = 4, num_workers: int = 4,
save_frames: bool = False, save_frames: bool = False,
) -> dict: ) -> dict:
def process_sample(sample: int): def process_sample(sample: int, lock: Lock):
time_benchmark = TimeBenchmark() time_benchmark = TimerManager(log=False)
timestamps = sample_timestamps(timestamps_mode, ep_num_images, fps) timestamps = sample_timestamps(timestamps_mode, ep_num_images, fps)
num_frames = len(timestamps) num_frames = len(timestamps)
result = { result = {
@@ -182,13 +172,13 @@ def benchmark_decoding(
"mse_values": [], "mse_values": [],
} }
with time_benchmark: with time_benchmark, lock:
frames = decode_video_frames(video_path, timestamps=timestamps, tolerance_s=5e-1, backend=backend) frames = decode_video_frames(video_path, timestamps=timestamps, tolerance_s=5e-1, backend=backend)
result["load_time_video_ms"] = time_benchmark.result_ms / num_frames result["load_time_video_ms"] = (time_benchmark.last * 1000) / num_frames
with time_benchmark: with time_benchmark:
original_frames = load_original_frames(imgs_dir, timestamps, fps) original_frames = load_original_frames(imgs_dir, timestamps, fps)
result["load_time_images_ms"] = time_benchmark.result_ms / num_frames result["load_time_images_ms"] = (time_benchmark.last * 1000) / num_frames
frames_np, original_frames_np = frames.numpy(), original_frames.numpy() frames_np, original_frames_np = frames.numpy(), original_frames.numpy()
for i in range(num_frames): for i in range(num_frames):
@@ -215,8 +205,10 @@ def benchmark_decoding(
# A sample is a single set of decoded frames specified by timestamps_mode (e.g. a single frame, 2 frames, etc.). # A sample is a single set of decoded frames specified by timestamps_mode (e.g. a single frame, 2 frames, etc.).
# For each sample, we record metrics (loading time and quality metrics) which are then averaged over all samples. # For each sample, we record metrics (loading time and quality metrics) which are then averaged over all samples.
# As these samples are independent, we run them in parallel threads to speed up the benchmark. # As these samples are independent, we run them in parallel threads to speed up the benchmark.
# Use a single shared lock for all worker threads
shared_lock = Lock()
with ThreadPoolExecutor(max_workers=num_workers) as executor: with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(process_sample, i) for i in range(num_samples)] futures = [executor.submit(process_sample, i, shared_lock) for i in range(num_samples)]
for future in tqdm(as_completed(futures), total=num_samples, desc="samples", leave=False): for future in tqdm(as_completed(futures), total=num_samples, desc="samples", leave=False):
result = future.result() result = future.result()
load_times_video_ms.append(result["load_time_video_ms"]) load_times_video_ms.append(result["load_time_video_ms"])
@@ -358,24 +350,27 @@ def main(
imgs_dir = output_dir / "images" / dataset.repo_id.replace("/", "_") imgs_dir = output_dir / "images" / dataset.repo_id.replace("/", "_")
# We only use the first episode # We only use the first episode
save_first_episode(imgs_dir, dataset) save_first_episode(imgs_dir, dataset)
for key, values in tqdm(encoding_benchmarks.items(), desc="encodings (g, crf)", leave=False): for duet in [
for value in tqdm(values, desc=f"encodings ({key})", leave=False): dict(zip(encoding_benchmarks.keys(), unique_combination, strict=False))
encoding_cfg = BASE_ENCODING.copy() for unique_combination in itertools.product(*encoding_benchmarks.values())
encoding_cfg["vcodec"] = video_codec ]:
encoding_cfg["pix_fmt"] = pixel_format encoding_cfg = BASE_ENCODING.copy()
encoding_cfg["vcodec"] = video_codec
encoding_cfg["pix_fmt"] = pixel_format
for key, value in duet.items():
encoding_cfg[key] = value encoding_cfg[key] = value
args_path = Path("_".join(str(value) for value in encoding_cfg.values())) args_path = Path("_".join(str(value) for value in encoding_cfg.values()))
video_path = output_dir / "videos" / args_path / f"{repo_id.replace('/', '_')}.mp4" video_path = output_dir / "videos" / args_path / f"{repo_id.replace('/', '_')}.mp4"
benchmark_table += benchmark_encoding_decoding( benchmark_table += benchmark_encoding_decoding(
dataset, dataset,
video_path, video_path,
imgs_dir, imgs_dir,
encoding_cfg, encoding_cfg,
decoding_benchmarks, decoding_benchmarks,
num_samples, num_samples,
num_workers, num_workers,
save_frames, save_frames,
) )
# Save intermediate results # Save intermediate results
benchmark_df = pd.DataFrame(benchmark_table, columns=headers) benchmark_df = pd.DataFrame(benchmark_table, columns=headers)
@@ -409,9 +404,9 @@ if __name__ == "__main__":
nargs="*", nargs="*",
default=[ default=[
"lerobot/pusht_image", "lerobot/pusht_image",
"aliberts/aloha_mobile_shrimp_image", "lerobot/aloha_mobile_shrimp_image",
"aliberts/paris_street", "lerobot/paris_street",
"aliberts/kitchen", "lerobot/kitchen",
], ],
help="Datasets repo-ids to test against. First episodes only are used. Must be images.", help="Datasets repo-ids to test against. First episodes only are used. Must be images.",
) )
@@ -419,7 +414,7 @@ if __name__ == "__main__":
"--vcodec", "--vcodec",
type=str, type=str,
nargs="*", nargs="*",
default=["libx264", "hevc", "libsvtav1"], default=["h264", "hevc", "libsvtav1"],
help="Video codecs to be tested", help="Video codecs to be tested",
) )
parser.add_argument( parser.add_argument(
@@ -468,7 +463,7 @@ if __name__ == "__main__":
"--backends", "--backends",
type=str, type=str,
nargs="*", nargs="*",
default=["pyav", "video_reader"], default=["torchcodec", "pyav"],
help="Torchvision decoding backend to be tested.", help="Torchvision decoding backend to be tested.",
) )
parser.add_argument( parser.add_argument(