Adding audio frames reading capability

This commit is contained in:
CarolinePascal
2025-04-03 11:41:05 +02:00
parent 8874547353
commit 6a2882f978
8 changed files with 132 additions and 35 deletions
+86 -34
View File
@@ -112,16 +112,15 @@ class Microphone:
config = MicrophoneConfig(microphone_index=0, sampling_rate=16000, channels=[1], data_type="int16")
microphone = Microphone(config)
microphone.connect()
microphone.start_recording("some/output/file.wav")
...
microphone.stop_recording()
# OR
microphone.start_recording()
audio_readings = (
microphone.read()
) # Gets all recorded audio data since the last read or since the beginning of the recording
...
microphone.stop_recording()
last_recorded_audio_chunk = microphone.queue.get()
microphone.disconnect()
```
"""
@@ -136,12 +135,16 @@ class Microphone:
# Input audio stream
self.stream = None
# Thread-safe concurrent queue to store the recorded audio
self.queue = Queue()
self.thread = None
self.stop_event = None
self.logs = {}
# Thread-safe concurrent queue to store the recorded/read audio
self.record_queue = Queue()
self.read_queue = Queue()
# Thread to handle data reading and file writing in a separate thread (safely)
self.record_thread = None
self.record_stop_event = None
self.logs = {}
self.is_connected = False
def connect(self) -> None:
@@ -200,15 +203,12 @@ class Microphone:
def _audio_callback(self, indata, frames, time, status) -> None:
if status:
logging.warning(status)
# slicing makes copy unnecessary
self.queue.put(indata[:, self.channels])
# Slicing makes copy unnecessary
# Two separate queues are necessary because .get() also pops the data from the queue
self.record_queue.put(indata[:, self.channels])
self.read_queue.put(indata[:, self.channels])
def _read_write_loop(self, output_file: Path) -> None:
output_file = Path(output_file)
if output_file.exists():
shutil.rmtree(
output_file,
)
def _record_loop(self, output_file: Path) -> None:
with sf.SoundFile(
output_file,
mode="x",
@@ -216,20 +216,73 @@ class Microphone:
channels=max(self.channels) + 1,
subtype=sf.default_subtype(output_file.suffix[1:]),
) as file:
while not self.stop_event.is_set():
file.write(self.queue.get())
while not self.record_stop_event.is_set():
file.write(self.record_queue.get())
# self.record_queue.task_done()
def _read(self) -> np.ndarray:
"""
Gets audio data from the queue and coverts it to a numpy array.
-> PROS : Inherently thread safe, no need to lock the queue, lightweight CPU usage
-> CONS : Reading duration does not scale well with the number of channels and reading duration
"""
try:
audio_readings = self.read_queue.queue
except Queue.Empty:
audio_readings = np.empty((0, len(self.channels)))
else:
# TODO(CarolinePascal): Check if this is the fastest way to do it
self.read_queue = Queue()
with self.read_queue.mutex:
self.read_queue.queue.clear()
# self.read_queue.all_tasks_done.notify_all()
audio_readings = np.array(audio_readings).reshape(-1, len(self.channels))
return audio_readings
def read(self) -> np.ndarray:
if not self.is_connected:
raise DeviceNotConnectedError(f"Microphone {self.microphone_index} is not connected.")
if not self.stream.active:
raise RuntimeError(f"Microphone {self.microphone_index} is not recording.")
start_time = time.perf_counter()
audio_readings = self._read()
# log the number of seconds it took to read the audio chunk
self.logs["delta_timestamp_s"] = time.perf_counter() - start_time
# log the utc time at which the audio chunk was received
self.logs["timestamp_utc"] = capture_timestamp_utc()
return audio_readings
def start_recording(self, output_file: str | None = None) -> None:
if not self.is_connected:
raise DeviceNotConnectedError(f"Microphone {self.microphone_index} is not connected.")
if output_file is not None:
self.stop_event = Event()
self.thread = Thread(target=self._read_write_loop, args=(output_file,))
self.thread.daemon = True
self.thread.start()
self.read_queue = Queue()
with self.read_queue.mutex:
self.read_queue.queue.clear()
# self.read_queue.all_tasks_done.notify_all()
self.record_queue = Queue()
with self.record_queue.mutex:
self.record_queue.queue.clear()
# self.record_queue.all_tasks_done.notify_all()
# Recording case
if output_file is not None:
output_file = Path(output_file)
if output_file.exists():
output_file.unlink()
self.record_stop_event = Event()
self.record_thread = Thread(target=self._record_loop, args=(output_file,))
self.record_thread.daemon = True
self.record_thread.start()
self.logs["start_timestamp"] = capture_timestamp_utc()
self.stream.start()
def stop_recording(self) -> None:
@@ -238,18 +291,17 @@ class Microphone:
self.logs["stop_timestamp"] = capture_timestamp_utc()
if self.thread is not None:
self.stop_event.set()
self.thread.join()
self.thread = None
self.stop_event = None
if self.record_thread is not None:
# self.record_queue.join()
self.record_stop_event.set()
self.record_thread.join()
self.record_thread = None
self.record_stop_event = None
if self.stream.active:
self.stream.stop() # Wait for all buffers to be processed
# Remark : stream.abort() flushes the buffers !
self.logs["duration"] = self.logs["stop_timestamp"] - self.logs["start_timestamp"]
def disconnect(self) -> None:
if not self.is_connected:
raise DeviceNotConnectedError(f"Microphone {self.microphone_index} is not connected.")
@@ -144,6 +144,13 @@ class HopeJrArm(Robot):
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
# Read audio frames from microphones
for mic_key, mic in self.microphones.items():
start = time.perf_counter()
obs_dict[mic_key] = mic.read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {mic_key}: {dt_ms:.1f}ms")
return obs_dict
@check_if_not_connected
@@ -175,6 +175,13 @@ class HopeJrHand(Robot):
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
# Read audio frames from microphones
for mic_key, mic in self.microphones.items():
start = time.perf_counter()
obs_dict[mic_key] = mic.read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {mic_key}: {dt_ms:.1f}ms")
return obs_dict
@check_if_not_connected
@@ -213,6 +213,13 @@ class KochFollower(Robot):
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
# Read audio frames from microphones
for mic_key, mic in self.microphones.items():
start = time.perf_counter()
obs_dict[mic_key] = mic.read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {mic_key}: {dt_ms:.1f}ms")
return obs_dict
@check_if_not_connected
+7
View File
@@ -380,6 +380,13 @@ class LeKiwi(Robot):
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
# Read audio frames from microphones
for mic_key, mic in self.microphones.items():
start = time.perf_counter()
obs_dict[mic_key] = mic.read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {mic_key}: {dt_ms:.1f}ms")
return obs_dict
@check_if_not_connected
@@ -207,6 +207,13 @@ class SOFollower(Robot):
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
# Read audio frames from microphones
for mic_key, mic in self.microphones.items():
start = time.perf_counter()
obs_dict[mic_key] = mic.read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {mic_key}: {dt_ms:.1f}ms")
return obs_dict
@check_if_not_connected
+3
View File
@@ -316,6 +316,9 @@ def record_loop(
if dataset is not None:
for microphone_key, microphone in robot.microphones.items():
dataset.add_microphone_recording(microphone, microphone_key)
else:
for _, microphone in robot.microphones.items():
microphone.start_recording()
timestamp = 0
start_episode_t = time.perf_counter()
+8 -1
View File
@@ -145,6 +145,10 @@ def teleop_loop(
"""
display_len = max(len(key) for key in robot.action_features)
for _, microphone in robot.microphones.items():
microphone.start_recording()
start = time.perf_counter()
while True:
@@ -192,7 +196,10 @@ def teleop_loop(
move_cursor_up(1)
if duration is not None and time.perf_counter() - start >= duration:
return
break
for _, microphone in robot.microphones.items():
microphone.stop_recording()
@parser.wrap()