From 4d46df2af5f678bdf1566b9a9509f8b628c58921 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 18 Dec 2024 03:29:42 +0000 Subject: [PATCH] abort tasks for non-async pool shudown Signed-off-by: Jason Volk --- src/database/engine.rs | 3 ++- src/database/pool.rs | 14 ++++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/database/engine.rs b/src/database/engine.rs index 63a6087d9..73ea559d9 100644 --- a/src/database/engine.rs +++ b/src/database/engine.rs @@ -347,7 +347,8 @@ impl Drop for Engine { fn drop(&mut self) { const BLOCKING: bool = true; - debug_assert!(!self.pool.close(), "request pool was not closed"); + debug!("Closing frontend pool"); + self.pool.close(); debug!("Waiting for background tasks to finish..."); self.db.cancel_all_background_work(BLOCKING); diff --git a/src/database/pool.rs b/src/database/pool.rs index 65012527d..28eb38bd9 100644 --- a/src/database/pool.rs +++ b/src/database/pool.rs @@ -2,7 +2,7 @@ use std::{ mem::take, sync::{ atomic::{AtomicUsize, Ordering}, - Arc, + Arc, Mutex, }, }; @@ -11,7 +11,7 @@ use conduwuit::{debug, debug_warn, defer, err, implement, result::DebugInspect, use futures::{channel::oneshot, TryFutureExt}; use oneshot::Sender as ResultSender; use rocksdb::Direction; -use tokio::{sync::Mutex, task::JoinSet}; +use tokio::task::JoinSet; use crate::{keyval::KeyBuf, stream, Handle, Map}; @@ -79,7 +79,7 @@ pub(crate) async fn new(server: &Arc, opts: &Opts) -> Result> pub(crate) async fn shutdown(self: &Arc) { self.close(); - let workers = take(&mut *self.workers.lock().await); + let workers = take(&mut *self.workers.lock().expect("locked")); debug!(workers = workers.len(), "Waiting for workers to join..."); workers.join_all().await; @@ -92,7 +92,13 @@ pub(crate) fn close(&self) -> bool { return false; } + let mut workers = take(&mut *self.workers.lock().expect("locked")); + debug!(workers = workers.len(), "Waiting for workers to join..."); + workers.abort_all(); + drop(workers); + std::thread::yield_now(); + debug_assert!(self.queue.is_empty(), "channel is not empty"); debug!( senders = self.queue.sender_count(), receivers = self.queue.receiver_count(), @@ -104,7 +110,7 @@ pub(crate) fn close(&self) -> bool { #[implement(Pool)] async fn spawn_until(self: &Arc, recv: Receiver, max: usize) -> Result { - let mut workers = self.workers.lock().await; + let mut workers = self.workers.lock().expect("locked"); while workers.len() < max { self.spawn_one(&mut workers, recv.clone())?; }