From 9bd844a3b99d5d9e2b68898a6bd104fa0bcd543a Mon Sep 17 00:00:00 2001 From: Jash Shah Date: Mon, 13 Apr 2026 07:25:42 -0700 Subject: [PATCH] fix(rl): ensure queue and process cleanup on abnormal exit (#3063) Wrap the main execution in actor_cli and start_learner_threads with try/finally so that queues are closed and processes are joined even when an unhandled exception occurs. Previously, exceptions in act_with_policy or add_actor_information_and_train would skip all cleanup code, leaking GPU/CPU resources. Also sets the shutdown_event on exception so child processes exit gracefully. Fixes #3059 Co-authored-by: Khalil Meftah --- src/lerobot/rl/actor.py | 51 +++++++++++++++++++++------------------ src/lerobot/rl/learner.py | 45 ++++++++++++++++++---------------- 2 files changed, 51 insertions(+), 45 deletions(-) diff --git a/src/lerobot/rl/actor.py b/src/lerobot/rl/actor.py index 0d785bde3..588adffac 100644 --- a/src/lerobot/rl/actor.py +++ b/src/lerobot/rl/actor.py @@ -175,33 +175,36 @@ def actor_cli(cfg: TrainRLServerPipelineConfig): interactions_process.start() receive_policy_process.start() - act_with_policy( - cfg=cfg, - shutdown_event=shutdown_event, - parameters_queue=parameters_queue, - transitions_queue=transitions_queue, - interactions_queue=interactions_queue, - ) - logging.info("[ACTOR] Policy process joined") + try: + act_with_policy( + cfg=cfg, + shutdown_event=shutdown_event, + parameters_queue=parameters_queue, + transitions_queue=transitions_queue, + interactions_queue=interactions_queue, + ) + logging.info("[ACTOR] Policy loop finished") + except Exception: + logging.exception("[ACTOR] Unhandled exception in act_with_policy") + shutdown_event.set() + finally: + logging.info("[ACTOR] Closing queues") + transitions_queue.close() + interactions_queue.close() + parameters_queue.close() - logging.info("[ACTOR] Closing queues") - transitions_queue.close() - interactions_queue.close() - parameters_queue.close() + transitions_process.join() + logging.info("[ACTOR] Transitions process joined") + interactions_process.join() + logging.info("[ACTOR] Interactions process joined") + receive_policy_process.join() + logging.info("[ACTOR] Receive policy process joined") - transitions_process.join() - logging.info("[ACTOR] Transitions process joined") - interactions_process.join() - logging.info("[ACTOR] Interactions process joined") - receive_policy_process.join() - logging.info("[ACTOR] Receive policy process joined") + transitions_queue.cancel_join_thread() + interactions_queue.cancel_join_thread() + parameters_queue.cancel_join_thread() - logging.info("[ACTOR] join queues") - transitions_queue.cancel_join_thread() - interactions_queue.cancel_join_thread() - parameters_queue.cancel_join_thread() - - logging.info("[ACTOR] queues closed") + logging.info("[ACTOR] Cleanup complete") # Core algorithm functions diff --git a/src/lerobot/rl/learner.py b/src/lerobot/rl/learner.py index 073d9a65f..d1207421b 100644 --- a/src/lerobot/rl/learner.py +++ b/src/lerobot/rl/learner.py @@ -218,30 +218,33 @@ def start_learner_threads( ) communication_process.start() - add_actor_information_and_train( - cfg=cfg, - wandb_logger=wandb_logger, - shutdown_event=shutdown_event, - transition_queue=transition_queue, - interaction_message_queue=interaction_message_queue, - parameters_queue=parameters_queue, - ) - logging.info("[LEARNER] Training process stopped") + try: + add_actor_information_and_train( + cfg=cfg, + wandb_logger=wandb_logger, + shutdown_event=shutdown_event, + transition_queue=transition_queue, + interaction_message_queue=interaction_message_queue, + parameters_queue=parameters_queue, + ) + logging.info("[LEARNER] Training process stopped") + except Exception: + logging.exception("[LEARNER] Unhandled exception in training loop") + shutdown_event.set() + finally: + logging.info("[LEARNER] Closing queues") + transition_queue.close() + interaction_message_queue.close() + parameters_queue.close() - logging.info("[LEARNER] Closing queues") - transition_queue.close() - interaction_message_queue.close() - parameters_queue.close() + communication_process.join() + logging.info("[LEARNER] Communication process joined") - communication_process.join() - logging.info("[LEARNER] Communication process joined") + transition_queue.cancel_join_thread() + interaction_message_queue.cancel_join_thread() + parameters_queue.cancel_join_thread() - logging.info("[LEARNER] join queues") - transition_queue.cancel_join_thread() - interaction_message_queue.cancel_join_thread() - parameters_queue.cancel_join_thread() - - logging.info("[LEARNER] queues closed") + logging.info("[LEARNER] Cleanup complete") # Core algorithm functions