Skip to content

Commit

Permalink
abort tasks for non-async pool shudown
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Volk <[email protected]>
  • Loading branch information
jevolk authored and girlbossceo committed Dec 18, 2024
1 parent 7b8320e commit 4d46df2
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
3 changes: 2 additions & 1 deletion src/database/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ impl Drop for Engine {
fn drop(&mut self) {
const BLOCKING: bool = true;

debug_assert!(!self.pool.close(), "request pool was not closed");
debug!("Closing frontend pool");
self.pool.close();

debug!("Waiting for background tasks to finish...");
self.db.cancel_all_background_work(BLOCKING);
Expand Down
14 changes: 10 additions & 4 deletions src/database/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
mem::take,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
Arc, Mutex,
},
};

Expand All @@ -11,7 +11,7 @@ use conduwuit::{debug, debug_warn, defer, err, implement, result::DebugInspect,
use futures::{channel::oneshot, TryFutureExt};
use oneshot::Sender as ResultSender;
use rocksdb::Direction;
use tokio::{sync::Mutex, task::JoinSet};
use tokio::task::JoinSet;

use crate::{keyval::KeyBuf, stream, Handle, Map};

Expand Down Expand Up @@ -79,7 +79,7 @@ pub(crate) async fn new(server: &Arc<Server>, opts: &Opts) -> Result<Arc<Self>>
pub(crate) async fn shutdown(self: &Arc<Self>) {
self.close();

let workers = take(&mut *self.workers.lock().await);
let workers = take(&mut *self.workers.lock().expect("locked"));
debug!(workers = workers.len(), "Waiting for workers to join...");

workers.join_all().await;
Expand All @@ -92,7 +92,13 @@ pub(crate) fn close(&self) -> bool {
return false;
}

let mut workers = take(&mut *self.workers.lock().expect("locked"));
debug!(workers = workers.len(), "Waiting for workers to join...");
workers.abort_all();
drop(workers);

std::thread::yield_now();
debug_assert!(self.queue.is_empty(), "channel is not empty");
debug!(
senders = self.queue.sender_count(),
receivers = self.queue.receiver_count(),
Expand All @@ -104,7 +110,7 @@ pub(crate) fn close(&self) -> bool {

#[implement(Pool)]
async fn spawn_until(self: &Arc<Self>, recv: Receiver<Cmd>, max: usize) -> Result {
let mut workers = self.workers.lock().await;
let mut workers = self.workers.lock().expect("locked");
while workers.len() < max {
self.spawn_one(&mut workers, recv.clone())?;
}
Expand Down

0 comments on commit 4d46df2

Please sign in to comment.