From 6a2882f978bbb62d2ae0cc60864755e695ee3ba3 Mon Sep 17 00:00:00 2001 From: CarolinePascal Date: Thu, 3 Apr 2025 11:41:05 +0200 Subject: [PATCH] Adding audio frames reading capability --- src/lerobot/microphones/microphone.py | 120 +++++++++++++----- src/lerobot/robots/hope_jr/hope_jr_arm.py | 7 + src/lerobot/robots/hope_jr/hope_jr_hand.py | 7 + .../robots/koch_follower/koch_follower.py | 7 + src/lerobot/robots/lekiwi/lekiwi.py | 7 + src/lerobot/robots/so_follower/so_follower.py | 7 + src/lerobot/scripts/lerobot_record.py | 3 + src/lerobot/scripts/lerobot_teleoperate.py | 9 +- 8 files changed, 132 insertions(+), 35 deletions(-) diff --git a/src/lerobot/microphones/microphone.py b/src/lerobot/microphones/microphone.py index 9e0942f42..597a73dea 100644 --- a/src/lerobot/microphones/microphone.py +++ b/src/lerobot/microphones/microphone.py @@ -112,16 +112,15 @@ class Microphone: config = MicrophoneConfig(microphone_index=0, sampling_rate=16000, channels=[1], data_type="int16") microphone = Microphone(config) + microphone.connect() microphone.start_recording("some/output/file.wav") ... - microphone.stop_recording() - - # OR - - microphone.start_recording() + audio_readings = ( + microphone.read() + ) # Gets all recorded audio data since the last read or since the beginning of the recording ... microphone.stop_recording() - last_recorded_audio_chunk = microphone.queue.get() + microphone.disconnect() ``` """ @@ -136,12 +135,16 @@ class Microphone: # Input audio stream self.stream = None - # Thread-safe concurrent queue to store the recorded audio - self.queue = Queue() - self.thread = None - self.stop_event = None - self.logs = {} + # Thread-safe concurrent queue to store the recorded/read audio + self.record_queue = Queue() + self.read_queue = Queue() + + # Thread to handle data reading and file writing in a separate thread (safely) + self.record_thread = None + self.record_stop_event = None + + self.logs = {} self.is_connected = False def connect(self) -> None: @@ -200,15 +203,12 @@ class Microphone: def _audio_callback(self, indata, frames, time, status) -> None: if status: logging.warning(status) - # slicing makes copy unnecessary - self.queue.put(indata[:, self.channels]) + # Slicing makes copy unnecessary + # Two separate queues are necessary because .get() also pops the data from the queue + self.record_queue.put(indata[:, self.channels]) + self.read_queue.put(indata[:, self.channels]) - def _read_write_loop(self, output_file: Path) -> None: - output_file = Path(output_file) - if output_file.exists(): - shutil.rmtree( - output_file, - ) + def _record_loop(self, output_file: Path) -> None: with sf.SoundFile( output_file, mode="x", @@ -216,20 +216,73 @@ class Microphone: channels=max(self.channels) + 1, subtype=sf.default_subtype(output_file.suffix[1:]), ) as file: - while not self.stop_event.is_set(): - file.write(self.queue.get()) + while not self.record_stop_event.is_set(): + file.write(self.record_queue.get()) + # self.record_queue.task_done() + + def _read(self) -> np.ndarray: + """ + Gets audio data from the queue and coverts it to a numpy array. + -> PROS : Inherently thread safe, no need to lock the queue, lightweight CPU usage + -> CONS : Reading duration does not scale well with the number of channels and reading duration + """ + try: + audio_readings = self.read_queue.queue + except Queue.Empty: + audio_readings = np.empty((0, len(self.channels))) + else: + # TODO(CarolinePascal): Check if this is the fastest way to do it + self.read_queue = Queue() + with self.read_queue.mutex: + self.read_queue.queue.clear() + # self.read_queue.all_tasks_done.notify_all() + audio_readings = np.array(audio_readings).reshape(-1, len(self.channels)) + + return audio_readings + + def read(self) -> np.ndarray: + if not self.is_connected: + raise DeviceNotConnectedError(f"Microphone {self.microphone_index} is not connected.") + if not self.stream.active: + raise RuntimeError(f"Microphone {self.microphone_index} is not recording.") + + start_time = time.perf_counter() + + audio_readings = self._read() + + # log the number of seconds it took to read the audio chunk + self.logs["delta_timestamp_s"] = time.perf_counter() - start_time + + # log the utc time at which the audio chunk was received + self.logs["timestamp_utc"] = capture_timestamp_utc() + + return audio_readings def start_recording(self, output_file: str | None = None) -> None: if not self.is_connected: raise DeviceNotConnectedError(f"Microphone {self.microphone_index} is not connected.") - if output_file is not None: - self.stop_event = Event() - self.thread = Thread(target=self._read_write_loop, args=(output_file,)) - self.thread.daemon = True - self.thread.start() + self.read_queue = Queue() + with self.read_queue.mutex: + self.read_queue.queue.clear() + # self.read_queue.all_tasks_done.notify_all() + + self.record_queue = Queue() + with self.record_queue.mutex: + self.record_queue.queue.clear() + # self.record_queue.all_tasks_done.notify_all() + + # Recording case + if output_file is not None: + output_file = Path(output_file) + if output_file.exists(): + output_file.unlink() + + self.record_stop_event = Event() + self.record_thread = Thread(target=self._record_loop, args=(output_file,)) + self.record_thread.daemon = True + self.record_thread.start() - self.logs["start_timestamp"] = capture_timestamp_utc() self.stream.start() def stop_recording(self) -> None: @@ -238,18 +291,17 @@ class Microphone: self.logs["stop_timestamp"] = capture_timestamp_utc() - if self.thread is not None: - self.stop_event.set() - self.thread.join() - self.thread = None - self.stop_event = None + if self.record_thread is not None: + # self.record_queue.join() + self.record_stop_event.set() + self.record_thread.join() + self.record_thread = None + self.record_stop_event = None if self.stream.active: self.stream.stop() # Wait for all buffers to be processed # Remark : stream.abort() flushes the buffers ! - self.logs["duration"] = self.logs["stop_timestamp"] - self.logs["start_timestamp"] - def disconnect(self) -> None: if not self.is_connected: raise DeviceNotConnectedError(f"Microphone {self.microphone_index} is not connected.") diff --git a/src/lerobot/robots/hope_jr/hope_jr_arm.py b/src/lerobot/robots/hope_jr/hope_jr_arm.py index 5fd9c4d1d..f3e355119 100644 --- a/src/lerobot/robots/hope_jr/hope_jr_arm.py +++ b/src/lerobot/robots/hope_jr/hope_jr_arm.py @@ -144,6 +144,13 @@ class HopeJrArm(Robot): dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + # Read audio frames from microphones + for mic_key, mic in self.microphones.items(): + start = time.perf_counter() + obs_dict[mic_key] = mic.read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {mic_key}: {dt_ms:.1f}ms") + return obs_dict @check_if_not_connected diff --git a/src/lerobot/robots/hope_jr/hope_jr_hand.py b/src/lerobot/robots/hope_jr/hope_jr_hand.py index 1e5c72b72..5d5329a33 100644 --- a/src/lerobot/robots/hope_jr/hope_jr_hand.py +++ b/src/lerobot/robots/hope_jr/hope_jr_hand.py @@ -175,6 +175,13 @@ class HopeJrHand(Robot): dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + # Read audio frames from microphones + for mic_key, mic in self.microphones.items(): + start = time.perf_counter() + obs_dict[mic_key] = mic.read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {mic_key}: {dt_ms:.1f}ms") + return obs_dict @check_if_not_connected diff --git a/src/lerobot/robots/koch_follower/koch_follower.py b/src/lerobot/robots/koch_follower/koch_follower.py index ece2a047b..5333222e6 100644 --- a/src/lerobot/robots/koch_follower/koch_follower.py +++ b/src/lerobot/robots/koch_follower/koch_follower.py @@ -213,6 +213,13 @@ class KochFollower(Robot): dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + # Read audio frames from microphones + for mic_key, mic in self.microphones.items(): + start = time.perf_counter() + obs_dict[mic_key] = mic.read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {mic_key}: {dt_ms:.1f}ms") + return obs_dict @check_if_not_connected diff --git a/src/lerobot/robots/lekiwi/lekiwi.py b/src/lerobot/robots/lekiwi/lekiwi.py index 52d269939..0e8341713 100644 --- a/src/lerobot/robots/lekiwi/lekiwi.py +++ b/src/lerobot/robots/lekiwi/lekiwi.py @@ -380,6 +380,13 @@ class LeKiwi(Robot): dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + # Read audio frames from microphones + for mic_key, mic in self.microphones.items(): + start = time.perf_counter() + obs_dict[mic_key] = mic.read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {mic_key}: {dt_ms:.1f}ms") + return obs_dict @check_if_not_connected diff --git a/src/lerobot/robots/so_follower/so_follower.py b/src/lerobot/robots/so_follower/so_follower.py index eb8d69498..f4e0197e0 100644 --- a/src/lerobot/robots/so_follower/so_follower.py +++ b/src/lerobot/robots/so_follower/so_follower.py @@ -207,6 +207,13 @@ class SOFollower(Robot): dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + # Read audio frames from microphones + for mic_key, mic in self.microphones.items(): + start = time.perf_counter() + obs_dict[mic_key] = mic.read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {mic_key}: {dt_ms:.1f}ms") + return obs_dict @check_if_not_connected diff --git a/src/lerobot/scripts/lerobot_record.py b/src/lerobot/scripts/lerobot_record.py index ac6f611f5..41d78172e 100644 --- a/src/lerobot/scripts/lerobot_record.py +++ b/src/lerobot/scripts/lerobot_record.py @@ -316,6 +316,9 @@ def record_loop( if dataset is not None: for microphone_key, microphone in robot.microphones.items(): dataset.add_microphone_recording(microphone, microphone_key) + else: + for _, microphone in robot.microphones.items(): + microphone.start_recording() timestamp = 0 start_episode_t = time.perf_counter() diff --git a/src/lerobot/scripts/lerobot_teleoperate.py b/src/lerobot/scripts/lerobot_teleoperate.py index 18d8863d6..99a99b4df 100644 --- a/src/lerobot/scripts/lerobot_teleoperate.py +++ b/src/lerobot/scripts/lerobot_teleoperate.py @@ -145,6 +145,10 @@ def teleop_loop( """ display_len = max(len(key) for key in robot.action_features) + + for _, microphone in robot.microphones.items(): + microphone.start_recording() + start = time.perf_counter() while True: @@ -192,7 +196,10 @@ def teleop_loop( move_cursor_up(1) if duration is not None and time.perf_counter() - start >= duration: - return + break + + for _, microphone in robot.microphones.items(): + microphone.stop_recording() @parser.wrap()