diff --git a/benchmarks/audio/run_tactile_benchmark.py b/benchmarks/audio/run_tactile_benchmark.py new file mode 100644 index 000000000..a98e7d970 --- /dev/null +++ b/benchmarks/audio/run_tactile_benchmark.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +from pathlib import Path + +import numpy as np +import soundfile as sf + +from lerobot.microphones.configs import MicrophoneConfig +from lerobot.microphones.touchlab import TouchLabSensorConfig +from lerobot.microphones.utils import ( + async_microphones_start_recording, + async_microphones_stop_recording, + make_microphones_from_configs, +) +from lerobot.utils.robot_utils import ( + busy_wait, +) + + +def main( + sensors_configs: dict[str, MicrophoneConfig], + multiprocessing: bool = False, +): + recording_dir = Path("outputs/tactile_benchmark") + recording_dir.mkdir(parents=True, exist_ok=True) + + # Create microphones + sensors = make_microphones_from_configs(sensors_configs) + + # Connect microphones + for sensor in sensors.values(): + sensor.connect() + + # Create audio chunks + data_chunks = {} + for sensor_key in sensors: + data_chunks.update({sensor_key: []}) + + # Start recording + async_microphones_start_recording( + sensors, + output_files=[recording_dir / f"{sensor_key}_recording.wav" for sensor_key in sensors], + multiprocessing=multiprocessing, + ) + + # Record audio chunks + busy_wait(10.0) + + for sensor_key, sensor in sensors.items(): + data_chunk = sensor.read() + print(f"{sensor_key} - samples {data_chunk.shape[0]}") + data_chunks[sensor_key].append(data_chunk) + + # Stop recording + async_microphones_stop_recording(sensors) + + for sensor_key in sensors: + data_chunks[sensor_key] = np.concatenate(data_chunks[sensor_key], axis=0) + + # Disconnect microphones + for sensor in sensors.values(): + sensor.disconnect() + + for sensor_key in sensors: + data, sample_rate = sf.read(recording_dir / f"{sensor_key}_recording.wav") + print(f"{sensor_key} - samples {data.shape[0]}") + print(f"{sensor_key} - sample rate {sample_rate}") + print(f"{sensor_key} - data {data}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--sensors_ports", + type=str, + nargs="+", + ) + parser.add_argument( + "--sensors_baud_rate", + type=int, + nargs="+", + ) + parser.add_argument( + "--sensors_sample_rate", + type=int, + nargs="+", + ) + parser.add_argument( + "--sensors_channels", + type=int, + nargs="+", + ) + parser.add_argument( + "--multiprocessing", + action="store_true", + ) + + args = vars(parser.parse_args()) + + args["sensors_configs"] = {} + for port, baud_rate, sample_rate, channels in zip( + args["sensors_ports"], + args["sensors_baud_rate"], + args["sensors_sample_rate"], + args["sensors_channels"], + strict=False, + ): + if isinstance(channels, int): + channels = [channels] + sensor_config = TouchLabSensorConfig( + sensor_port=port, + baud_rate=baud_rate, + sample_rate=sample_rate, + channels=channels, + ) + args["sensors_configs"].update({f"sensor_{port}": sensor_config}) + args.pop("sensors_ports") + args.pop("sensors_baud_rate") + args.pop("sensors_sample_rate") + args.pop("sensors_channels") + + main(**args)