From 716bd0258df3447215437a29eea93a1e2e889c34 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 17 Oct 2023 14:41:50 +0200 Subject: [PATCH] use tokio::sync::Mutex-based guard for to keep hold of an open submit side (#18) ... instead of a closure. This required refactoring of how shutdown works, but that's also a good win. On top of #17 --- Cargo.lock | 24 --- tokio-epoll-uring/Cargo.toml | 1 - tokio-epoll-uring/src/system/completion.rs | 49 +++-- tokio-epoll-uring/src/system/lifecycle.rs | 39 ++-- .../src/system/lifecycle/handle.rs | 45 +---- tokio-epoll-uring/src/system/slots.rs | 15 +- tokio-epoll-uring/src/system/submission.rs | 119 ++++++++---- .../src/system/submission/op_fut.rs | 178 ++++++++---------- .../system/test_util/shared_system_handle.rs | 2 +- 9 files changed, 221 insertions(+), 251 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6c55d59..5f78f5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -119,18 +119,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "auto_enums" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd4ba50b181a898ce52142184e3a46641002b3b190bf5ef827eb3c578fad4b70" -dependencies = [ - "derive_utils", - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "autocfg" version = "1.1.0" @@ -499,17 +487,6 @@ dependencies = [ "serde", ] -[[package]] -name = "derive_utils" -version = "0.13.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9abcad25e9720609ccb3dcdb795d845e37d8ce34183330a9f48b03a1a71c8e21" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "either" version = "1.9.0" @@ -1633,7 +1610,6 @@ name = "tokio-epoll-uring" version = "0.1.0" dependencies = [ "assert-panic", - "auto_enums", "futures", "io-uring 0.6.0", "nix 0.26.2", diff --git a/tokio-epoll-uring/Cargo.toml b/tokio-epoll-uring/Cargo.toml index 3155af6..8e96817 100644 --- a/tokio-epoll-uring/Cargo.toml +++ b/tokio-epoll-uring/Cargo.toml @@ -8,7 +8,6 @@ repository = "https://github.com/neondatabase/tokio-epoll-uring" license = "MIT OR Apache-2.0" [dependencies] -auto_enums = "0.8.2" futures = "0.3.28" io-uring = "0.6.0" once_cell = "1.18.0" diff --git a/tokio-epoll-uring/src/system/completion.rs b/tokio-epoll-uring/src/system/completion.rs index 3fc4c7b..f022d54 100644 --- a/tokio-epoll-uring/src/system/completion.rs +++ b/tokio-epoll-uring/src/system/completion.rs @@ -7,11 +7,12 @@ use io_uring::CompletionQueue; use tokio::sync::{self, broadcast, mpsc, oneshot}; use tracing::{debug, info, info_span, trace, Instrument}; -use crate::util::oneshot_nonconsuming; +use crate::{system::submission::SubmitSideInner, util::oneshot_nonconsuming}; use super::{ lifecycle::{ShutdownRequest, System}, slots::{self, Slots}, + submission::SubmitSideOpen, }; pub(crate) struct CompletionSide { @@ -63,12 +64,11 @@ pub(crate) struct PollerNewArgs { pub system: System, pub(crate) slots: Slots<{ slots::co_owner::POLLER }>, pub testing: Option, + pub shutdown_rx: oneshot_nonconsuming::Receiver, } impl Poller { - pub(crate) async fn launch( - args: PollerNewArgs, - ) -> oneshot_nonconsuming::SendOnce { + pub(crate) fn launch(args: PollerNewArgs) -> impl std::future::Future + Send { let PollerNewArgs { id, uring_fd, @@ -76,8 +76,8 @@ impl Poller { system, slots: ops, testing, + shutdown_rx, } = args; - let (shutdown_tx, shutdown_rx) = oneshot_nonconsuming::channel(); let poller_task_state = Arc::new(Mutex::new(Poller { id, state: PollerState::RunningInTask(Arc::new(Mutex::new(PollerStateInner { @@ -94,18 +94,19 @@ impl Poller { poller_ready_tx, testing, )); - poller_ready_rx - .await - // TODO make launch fallible and propagate this error - .expect("poller task must not die during startup"); - shutdown_tx + async move { + poller_ready_rx + .await + // TODO make launch fallible and propagate this error + .expect("poller task must not die during startup"); + } } } enum PollerState { RunningInTask(Arc>), RunningInThread(Arc>), - ShuttingDownPreemptible(Arc>, Arc), + ShuttingDownPreemptible(Arc>, Arc), ShuttingDownNoMorePreemptible, ShutDown, } @@ -293,9 +294,8 @@ async fn poller_impl( }; let shutdown_req_shared = match maybe_shutdown_req_shared { None => { - let shutdown_req: ShutdownRequest = tokio::select! { - req = poller_impl_impl(Arc::clone(&inner_shared), preempt_in_epoll) => { req }, - }; + let shutdown_req: ShutdownRequestImpl = + poller_impl_impl(Arc::clone(&inner_shared), preempt_in_epoll).await; let shared = Arc::new(shutdown_req); poller.lock().unwrap().state = PollerState::ShuttingDownPreemptible( Arc::clone(&inner_shared), @@ -319,12 +319,13 @@ async fn poller_impl( } // 1. Prevent new ops from being submitted and wait for all inflight ops to finish. - // `SystemHandleInner::shutdown` already plugged the sumit side & transitioned Ops to `Draining` state. + // `poller_impl_impl` already transitioned `slots` to `Draining` state. // So, all that's left is to wait for pending count to reach 0. loop { { let inner_guard = inner_shared.lock().unwrap(); let mut completion_side_guard = inner_guard.completion_side.lock().unwrap(); + completion_side_guard.slots.transition_to_draining(); let pending_count = completion_side_guard.slots.pending_slot_count(); debug!(pending_count, "waiting for pending operations to complete"); if pending_count == 0 { @@ -393,10 +394,15 @@ async fn poller_impl( })() } +pub(crate) struct ShutdownRequestImpl { + pub(crate) done_tx: Option>, + pub(crate) submit_side_open: SubmitSideOpen, +} + async fn poller_impl_impl( inner: Arc>, mut preempt_in_epoll: Option>>, -) -> ShutdownRequest { +) -> ShutdownRequestImpl { let (uring_fd, completion_side, mut shutdown_rx) = { let mut inner_guard = inner.lock().unwrap(); let PollerStateInner { @@ -436,9 +442,14 @@ async fn poller_impl_impl( } rx = shutdown_rx.recv() => { match rx { - crate::util::oneshot_nonconsuming::RecvResult::FirstRecv(req) => { + crate::util::oneshot_nonconsuming::RecvResult::FirstRecv(ShutdownRequest { done_tx, submit_side_inner }) => { tracing::debug!("got explicit shutdown request"); - return req; + let mut inner = submit_side_inner.lock().await; + let open = match std::mem::replace(&mut *inner, SubmitSideInner::ShutDownInitiated) { + SubmitSideInner::Open(open) => open, + SubmitSideInner::ShutDownInitiated => unreachable!("poller_impl transitions to state ShuttingDownPreemptible when we return a shutdown request, so, it won't call poller_impl_impl again"), + }; + return ShutdownRequestImpl { done_tx, submit_side_open: open }; } crate::util::oneshot_nonconsuming::RecvResult::NotFirstRecv => { panic!("once we observe a shutdown request, we return it and the caller does through with shutdown, without a chance for the executor to intervene") @@ -561,7 +572,7 @@ mod tests { .build() .unwrap(); second_rt.block_on(async move { - let mut shutdown_done_fut = shutdown_done_fut; + let mut shutdown_done_fut = Box::pin(shutdown_done_fut); tokio::select! { // TODO don't rely on timing _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => { } diff --git a/tokio-epoll-uring/src/system/lifecycle.rs b/tokio-epoll-uring/src/system/lifecycle.rs index 9d88b0a..a726ff4 100644 --- a/tokio-epoll-uring/src/system/lifecycle.rs +++ b/tokio-epoll-uring/src/system/lifecycle.rs @@ -8,13 +8,16 @@ pub mod thread_local; use io_uring::{CompletionQueue, SubmissionQueue, Submitter}; -use crate::system::{submission::SubmitSideOpen, RING_SIZE}; +use crate::{ + system::{completion::ShutdownRequestImpl, RING_SIZE}, + util::oneshot_nonconsuming, +}; use super::{ completion::{CompletionSide, Poller, PollerNewArgs, PollerTesting}, lifecycle::handle::SystemHandle, slots::{self, Slots}, - submission::{SubmitSide, SubmitSideNewArgs}, + submission::{SubmitSide, SubmitSideInner, SubmitSideNewArgs}, }; /// A running `tokio_epoll_uring` system. Use [`Self::launch`] to start, then [`SystemHandle`] to interact. @@ -59,12 +62,15 @@ impl System { slots_completion_side, ))); + let (shutdown_tx, shutdown_rx) = oneshot_nonconsuming::channel(); + let submit_side = SubmitSide::new(SubmitSideNewArgs { id, submitter, sq, slots: slots_submit_side, completion_side: Arc::clone(&completion_side), + shutdown_tx, }); let system = System { id, @@ -77,26 +83,27 @@ impl System { system, slots: slots_poller, testing, + shutdown_rx, }); (submit_side, poller_ready_fut) }; - let poller_shutdown_tx = poller_ready_fut.await; + poller_ready_fut.await; - SystemHandle::new(id, submit_side, poller_shutdown_tx) + SystemHandle::new(id, submit_side) } } pub(crate) struct ShutdownRequest { - pub done_tx: tokio::sync::oneshot::Sender<()>, - pub open_state: SubmitSideOpen, + pub done_tx: Option>, + pub submit_side_inner: Arc>, } pub(crate) fn poller_impl_finish_shutdown( system: System, ops: Slots<{ slots::co_owner::POLLER }>, completion_side: Arc>, - req: ShutdownRequest, + shutdown_request: ShutdownRequestImpl, ) { tracing::info!("poller shutdown start"); scopeguard::defer_on_success! {tracing::info!("poller shutdown end")}; @@ -104,15 +111,13 @@ pub(crate) fn poller_impl_finish_shutdown( let System { id: _, split_uring } = { system }; - let ShutdownRequest { - done_tx, - open_state, - } = req; + let ShutdownRequestImpl { + mut done_tx, + submit_side_open, + } = { shutdown_request }; - let (submitter, sq) = open_state.deconstruct(); - let completion_side = Arc::try_unwrap(completion_side) - .ok() - .expect("we plugged the SubmitSide, so, all refs to CompletionSide are gone"); + let (submitter, sq) = submit_side_open.deconstruct(); + let completion_side = Arc::try_unwrap(completion_side).ok().unwrap(); let completion_side = Mutex::into_inner(completion_side).unwrap(); // Unsplit the uring @@ -154,5 +159,7 @@ pub(crate) fn poller_impl_finish_shutdown( // notify about completed shutdown; // ignore send errors, interest may be gone if it's implicit shutdown through SystemHandle::drop - let _ = done_tx.send(()); + if let Some(done_tx) = done_tx.take() { + let _ = done_tx.send(()); + } } diff --git a/tokio-epoll-uring/src/system/lifecycle/handle.rs b/tokio-epoll-uring/src/system/lifecycle/handle.rs index 9cf35d2..7de6d20 100644 --- a/tokio-epoll-uring/src/system/lifecycle/handle.rs +++ b/tokio-epoll-uring/src/system/lifecycle/handle.rs @@ -9,8 +9,6 @@ use crate::{ system::submission::{op_fut::execute_op, SubmitSide}, }; -use super::ShutdownRequest; - /// Owned handle to the [`System`](crate::System) created by [`System::launch`](crate::System::launch). /// /// The only use of this handle is to shut down the [`System`](crate::System). @@ -28,31 +26,12 @@ struct SystemHandleInner { #[allow(dead_code)] pub(super) id: usize, pub(crate) submit_side: SubmitSide, - pub(super) shutdown_tx: crate::util::oneshot_nonconsuming::SendOnce, -} - -impl Drop for SystemHandle { - fn drop(&mut self) { - if let Some(inner) = self.inner.take() { - let wait_shutdown_done = inner.shutdown(); - // we don't care about the result - drop(wait_shutdown_done); - } - } } impl SystemHandle { - pub(crate) fn new( - id: usize, - submit_side: SubmitSide, - shutdown_tx: crate::util::oneshot_nonconsuming::SendOnce, - ) -> Self { + pub(crate) fn new(id: usize, submit_side: SubmitSide) -> Self { SystemHandle { - inner: Some(SystemHandleInner { - id, - submit_side, - shutdown_tx, - }), + inner: Some(SystemHandleInner { id, submit_side }), } } @@ -78,7 +57,7 @@ impl SystemHandle { /// the shutdown procedure makes sure to continue in a new `std::thread`. /// /// So, it is safe to drop the tokio runtime on which the poller task runs. - pub fn initiate_shutdown(mut self) -> impl std::future::Future + Send + Unpin { + pub fn initiate_shutdown(mut self) -> impl std::future::Future + Send { let inner = self .inner .take() @@ -107,22 +86,8 @@ impl std::future::Future for WaitShutdownFut { } impl SystemHandleInner { - fn shutdown(self) -> impl std::future::Future + Send + Unpin { - let SystemHandleInner { - id: _, - submit_side, - shutdown_tx, - } = self; - let (done_tx, done_rx) = tokio::sync::oneshot::channel(); - let req = ShutdownRequest { - done_tx, - open_state: submit_side.plug(), - }; - shutdown_tx - .send(req) - .ok() - .expect("implementation error: poller task must not die before SystemHandle"); - WaitShutdownFut { done_rx } + fn shutdown(self) -> impl std::future::Future + Send { + self.submit_side.shutdown() } } diff --git a/tokio-epoll-uring/src/system/slots.rs b/tokio-epoll-uring/src/system/slots.rs index 0318555..35df06c 100644 --- a/tokio-epoll-uring/src/system/slots.rs +++ b/tokio-epoll-uring/src/system/slots.rs @@ -294,8 +294,8 @@ impl SlotsInner { } } -impl Slots<{ co_owner::SUBMIT_SIDE }> { - pub(super) fn set_draining(&self) { +impl Slots<{ co_owner::COMPLETION_SIDE }> { + pub(super) fn transition_to_draining(&self) { let mut inner_guard = self.inner.lock().unwrap(); match &mut inner_guard.state { SlotsInnerState::Open { @@ -306,9 +306,7 @@ impl Slots<{ co_owner::SUBMIT_SIDE }> { // thereby making all of the op futures return with a shutdown error inner_guard.state = SlotsInnerState::Draining; } - SlotsInnerState::Draining => { - panic!("implementation error: must only call set_draining once") - } + SlotsInnerState::Draining => {} } } } @@ -387,15 +385,14 @@ impl Slots<{ co_owner::SUBMIT_SIDE }> { } impl SlotHandle { - pub(crate) fn use_for_op( + pub(crate) fn use_for_op( self, mut op: O, do_submit: S, - do_submit_arg: &mut T, ) -> impl std::future::Future>)> where O: Op + Send + 'static, - S: Fn(&mut T, io_uring::squeue::Entry), + S: FnOnce(io_uring::squeue::Entry), { let sqe = op.make_sqe(); let sqe = sqe.user_data(u64::try_from(self.idx).unwrap()); @@ -418,7 +415,7 @@ impl SlotHandle { }); }; - do_submit(do_submit_arg, sqe); + do_submit(sqe); futures::future::Either::Right(self.wait_for_completion(op)) } diff --git a/tokio-epoll-uring/src/system/submission.rs b/tokio-epoll-uring/src/system/submission.rs index 0d5c17a..85e93f4 100644 --- a/tokio-epoll-uring/src/system/submission.rs +++ b/tokio-epoll-uring/src/system/submission.rs @@ -1,11 +1,15 @@ pub(crate) mod op_fut; -use std::sync::{Arc, Mutex, Weak}; +use std::{ + ops::{Deref, DerefMut}, + sync::{Arc, Mutex, Weak}, +}; use io_uring::{SubmissionQueue, Submitter}; use super::{ completion::CompletionSide, + lifecycle::ShutdownRequest, slots::{self, Slots}, }; @@ -15,11 +19,14 @@ pub(crate) struct SubmitSideNewArgs { pub(crate) sq: SubmissionQueue<'static>, pub(crate) slots: Slots<{ slots::co_owner::SUBMIT_SIDE }>, pub(crate) completion_side: Arc>, + pub(crate) shutdown_tx: crate::util::oneshot_nonconsuming::SendOnce, } pub(crate) struct SubmitSide { // This is the only long-lived strong reference to the `SubmitSideInner`. - inner: Arc>, + inner: Arc>, + shutdown_tx: Option>, + shutdown_done_tx: Option>, } impl SubmitSide { @@ -30,19 +37,54 @@ impl SubmitSide { sq, slots: ops, completion_side, + shutdown_tx, } = args; SubmitSide { - inner: Arc::new(Mutex::new(SubmitSideInner::Open(SubmitSideOpen { - id, - submitter, - sq, - slots: ops, - completion_side: Arc::clone(&completion_side), - }))), + inner: Arc::new(tokio::sync::Mutex::new(SubmitSideInner::Open( + SubmitSideOpen { + id, + submitter, + sq, + slots: ops, + completion_side: Arc::clone(&completion_side), + }, + ))), + shutdown_tx: Some(shutdown_tx), + shutdown_done_tx: None, + } + } +} + +impl SubmitSide { + pub fn shutdown(mut self) -> impl std::future::Future + Send { + let (done_tx, done_rx) = tokio::sync::oneshot::channel(); + let prev = self.shutdown_done_tx.replace(done_tx); + assert!(prev.is_none()); + drop(self); // sends the shutdown request + async { + done_rx + .await + // TODO: return the error? + .unwrap() } } } +impl Drop for SubmitSide { + fn drop(&mut self) { + self.shutdown_tx + .take() + .unwrap() + .send(ShutdownRequest { + done_tx: self.shutdown_done_tx.take(), + submit_side_inner: Arc::clone(&self.inner), + }) + // TODO: can we just ignore the error? + .ok() + .unwrap(); + } +} + impl SubmitSideOpen { pub(crate) fn submit_raw( &mut self, @@ -62,38 +104,46 @@ impl SubmitSideOpen { } #[derive(Clone)] -pub struct SubmitSideWeak(Weak>); +pub struct SubmitSideWeak(Weak>); impl SubmitSideWeak { - pub(crate) fn with_submit_side_open(&self, f: F) -> R - where - F: FnOnce(Option<&mut SubmitSideOpen>) -> R, - { - let submit_side = match self.0.upgrade() { - Some(submit_side) => submit_side, - None => return f(None), + pub(crate) async fn upgrade_to_open(&self) -> Option { + let inner: Arc> = match self.0.upgrade() { + Some(inner) => inner, + None => return None, }; - SubmitSide { inner: submit_side }.with_submit_side_open(f) + let mut inner_guard = inner.lock_owned().await; + match &mut *inner_guard { + SubmitSideInner::Open(_) => Some(SubmitSideOpenGuard(inner_guard)), + SubmitSideInner::ShutDownInitiated => None, + } } } -impl SubmitSide { - pub(crate) fn with_submit_side_open(&self, f: F) -> R - where - F: FnOnce(Option<&mut SubmitSideOpen>) -> R, - { - let mut inner_guard = self.inner.lock().unwrap(); - let submit_side_open = match &mut *inner_guard { +pub(crate) struct SubmitSideOpenGuard(tokio::sync::OwnedMutexGuard); + +impl Deref for SubmitSideOpenGuard { + type Target = SubmitSideOpen; + fn deref(&self) -> &Self::Target { + match &*self.0 { SubmitSideInner::Open(open) => open, - SubmitSideInner::Plugged => return f(None), - }; - f(Some(submit_side_open)) + SubmitSideInner::ShutDownInitiated => unreachable!(), + } + } +} + +impl DerefMut for SubmitSideOpenGuard { + fn deref_mut(&mut self) -> &mut Self::Target { + match &mut *self.0 { + SubmitSideInner::Open(open) => open, + SubmitSideInner::ShutDownInitiated => unreachable!(), + } } } pub(crate) enum SubmitSideInner { Open(SubmitSideOpen), - Plugged, + ShutDownInitiated, } pub(crate) struct SubmitSideOpen { @@ -109,17 +159,6 @@ impl SubmitSide { pub(crate) fn weak(&self) -> SubmitSideWeak { SubmitSideWeak(Arc::downgrade(&self.inner)) } - pub(crate) fn plug(self) -> SubmitSideOpen { - let mut inner = self.inner.lock().unwrap(); - let cur = std::mem::replace(&mut *inner, SubmitSideInner::Plugged); - match cur { - SubmitSideInner::Open(open) => { - open.slots.set_draining(); - open - } - SubmitSideInner::Plugged => unreachable!(), - } - } } impl SubmitSideOpen { diff --git a/tokio-epoll-uring/src/system/submission/op_fut.rs b/tokio-epoll-uring/src/system/submission/op_fut.rs index da5ddad..9edaa25 100644 --- a/tokio-epoll-uring/src/system/submission/op_fut.rs +++ b/tokio-epoll-uring/src/system/submission/op_fut.rs @@ -12,14 +12,12 @@ pub trait Op: crate::sealed::Sealed + Sized + Send + 'static { fn make_sqe(&mut self) -> io_uring::squeue::Entry; } -use futures::future; - use crate::system::{ completion::ProcessCompletionsCause, slots::{self, SlotHandle}, }; -use super::SubmitSideWeak; +use super::{SubmitSideOpenGuard, SubmitSideWeak}; #[derive(Debug, thiserror::Error)] pub enum SystemError { @@ -57,110 +55,88 @@ where // FIXME: probably dont need the unpin O: Op + Send + 'static + Unpin, { - let submit_side_weak = submit_side.clone(); - - submit_side.with_submit_side_open(|submit_side| { - let open: &mut super::SubmitSideOpen = match submit_side { - Some(submit_side) => submit_side, - None => return Fut::A(async move {( - op.on_failed_submission(), - Err(Error::System(SystemError::SystemShuttingDown)), - )}), - }; - - let do_submit = |submit_side: &mut super::SubmitSideOpen, sqe|{ - if submit_side.submit_raw(sqe).is_err() { - // TODO: DESIGN: io_uring can deal have more ops inflight than the SQ. - // So, we could just submit_and_wait here. But, that'd prevent the - // current executor thread from making progress on other tasks. - // - // So, for now, keep SQ size == inflight ops size == Slots size. - // This potentially limits throughput if SQ size is chosen too small. - // - // FIXME: why not just async mutex? - unreachable!("the `ops` has same size as the SQ, so, if SQ is full, we wouldn't have been able to get this slot"); - } - - // this allows us to keep the possible guard in cq_guard because the arc lives on stack - #[allow(unused_assignments)] - let mut cq_owned = None; + let open_guard = match submit_side.upgrade_to_open().await { + Some(open) => open, + None => { + return ( + op.on_failed_submission(), + Err(Error::System(SystemError::SystemShuttingDown)), + ); + } + }; + + fn do_submit(mut open_guard: SubmitSideOpenGuard, sqe: io_uring::squeue::Entry) { + if open_guard.submit_raw(sqe).is_err() { + // TODO: DESIGN: io_uring can deal have more ops inflight than the SQ. + // So, we could just submit_and_wait here. But, that'd prevent the + // current executor thread from making progress on other tasks. + // + // So, for now, keep SQ size == inflight ops size == Slots size. + // This potentially limits throughput if SQ size is chosen too small. + // + // FIXME: why not just async mutex? + unreachable!("the `ops` has same size as the SQ, so, if SQ is full, we wouldn't have been able to get this slot"); + } - let cq_guard = if *crate::env_tunables::PROCESS_COMPLETIONS_ON_SUBMIT { - let cq = Arc::clone(&submit_side.completion_side); - cq_owned = Some(cq); - Some(cq_owned.as_ref().expect("we just set it").lock().unwrap()) - } else { - None - }; + // this allows us to keep the possible guard in cq_guard because the arc lives on stack + #[allow(unused_assignments)] + let mut cq_owned = None; - if let Some(mut cq) = cq_guard { - // opportunistically process completion immediately - // TODO do it during ::poll() as well? - // - // FIXME: why are we doing this while holding the SubmitSideOpen - cq.process_completions(ProcessCompletionsCause::Regular); - } + let cq_guard = if *crate::env_tunables::PROCESS_COMPLETIONS_ON_SUBMIT { + let cq = Arc::clone(&open_guard.completion_side); + cq_owned = Some(cq); + Some(cq_owned.as_ref().expect("we just set it").lock().unwrap()) + } else { + None }; + drop(open_guard); // drop it asap to enable timely shutdown + + if let Some(mut cq) = cq_guard { + // opportunistically process completion immediately + // TODO do it during ::poll() as well? + // + // FIXME: why are we doing this while holding the SubmitSideOpen + cq.process_completions(ProcessCompletionsCause::Regular); + } + } - match slot { - Some(slot) => Fut::B({ - slot.use_for_op(op, do_submit, open) - }), - None => { - match open.slots.try_get_slot() { - slots::TryGetSlotResult::Draining => { - Fut::C(async move { (op.on_failed_submission(), Err(Error::System(SystemError::SystemShuttingDown)))}) - }, - slots::TryGetSlotResult::GotSlot(slot) => { - Fut::D(async move { - submit_side_weak.with_submit_side_open(|submit_side| { - let open: &mut super::SubmitSideOpen = match submit_side { - Some(submit_side) => submit_side, - None => return future::Either::Left(async move {(op.on_failed_submission(), Err(Error::System(SystemError::SystemShuttingDown)))}), - }; - future::Either::Right(slot.use_for_op(op, do_submit, open)) - }).await - }) - }, - slots::TryGetSlotResult::NoSlots(later) => { - // All slots are taken and we're waiting in line. - // If enabled, do some opportunistic completion processing to wake up futures that will release ops slots. - // This is in the hope that we'll wake ourselves up. - - if *crate::env_tunables::PROCESS_COMPLETIONS_ON_QUEUE_FULL { - // TODO shouldn't we loop here until we've got a slot? This one-off poll doesn't make much sense. - open.submitter.submit().unwrap(); - open.completion_side - .lock() - .unwrap() - .process_completions(ProcessCompletionsCause::Regular); - } - Fut::E(async move { - let slot = match later.await { - Ok(slot) => slot, - Err(_dropped) => return (op.on_failed_submission(), Err(Error::System(SystemError::SystemShuttingDown))), - }; - submit_side_weak.with_submit_side_open(|submit_side| { - let open: &mut super::SubmitSideOpen = match submit_side { - Some(submit_side) => submit_side, - None => return future::Either::Left(async move {(op.on_failed_submission(), Err(Error::System(SystemError::SystemShuttingDown)))}), - }; - future::Either::Right(slot.use_for_op(op, do_submit, open)) - }).await - }) + match slot { + Some(slot) => slot.use_for_op(op, |sqe| do_submit(open_guard, sqe)).await, + None => { + match open_guard.slots.try_get_slot() { + slots::TryGetSlotResult::Draining => ( + op.on_failed_submission(), + Err(Error::System(SystemError::SystemShuttingDown)), + ), + slots::TryGetSlotResult::GotSlot(slot) => { + slot.use_for_op(op, |sqe| do_submit(open_guard, sqe)).await + } + slots::TryGetSlotResult::NoSlots(later) => { + // All slots are taken and we're waiting in line. + // If enabled, do some opportunistic completion processing to wake up futures that will release ops slots. + // This is in the hope that we'll wake ourselves up. + + if *crate::env_tunables::PROCESS_COMPLETIONS_ON_QUEUE_FULL { + // TODO shouldn't we loop here until we've got a slot? This one-off poll doesn't make much sense. + open_guard.submitter.submit().unwrap(); + open_guard + .completion_side + .lock() + .unwrap() + .process_completions(ProcessCompletionsCause::Regular); } + let slot = match later.await { + Ok(slot) => slot, + Err(_dropped) => { + return ( + op.on_failed_submission(), + Err(Error::System(SystemError::SystemShuttingDown)), + ) + } + }; + slot.use_for_op(op, |sqe| do_submit(open_guard, sqe)).await } } } - }).await -} - -// Used by `execute_op` to avoid boxing the future returned by the `with_submit_side` closure. -#[auto_enums::enum_derive(Future)] -enum Fut { - A(A), - B(B), - C(C), - D(D), - E(E), + } } diff --git a/tokio-epoll-uring/src/system/test_util/shared_system_handle.rs b/tokio-epoll-uring/src/system/test_util/shared_system_handle.rs index d0f0011..367177b 100644 --- a/tokio-epoll-uring/src/system/test_util/shared_system_handle.rs +++ b/tokio-epoll-uring/src/system/test_util/shared_system_handle.rs @@ -33,7 +33,7 @@ impl SharedSystemHandle { /// For more details, see [`crate::SystemHandle::initiate_shutdown`]. /// /// TODO: change API to return an error, using [`Arc::try_unwrap`] or similar? - pub fn initiate_shutdown(self) -> impl Future + Send + Unpin { + pub fn initiate_shutdown(self) -> impl Future + Send { self.0 .write() .unwrap()