From f0c3bba612b7a3ae12c21b48da922023e400deb0 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 1 Sep 2023 13:45:53 +0200 Subject: [PATCH] eliminate `use_slot()` and thereby avoid double `with_submit_side_open` (#17) --- Cargo.lock | 24 +++++ tokio-epoll-uring/Cargo.toml | 1 + .../src/system/lifecycle/handle.rs | 4 +- tokio-epoll-uring/src/system/slots.rs | 7 +- tokio-epoll-uring/src/system/submission.rs | 37 +------- .../src/system/submission/op_fut.rs | 92 +++++++++++++++---- 6 files changed, 106 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f78f5a..6c55d59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -119,6 +119,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "auto_enums" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd4ba50b181a898ce52142184e3a46641002b3b190bf5ef827eb3c578fad4b70" +dependencies = [ + "derive_utils", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -487,6 +499,17 @@ dependencies = [ "serde", ] +[[package]] +name = "derive_utils" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9abcad25e9720609ccb3dcdb795d845e37d8ce34183330a9f48b03a1a71c8e21" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "either" version = "1.9.0" @@ -1610,6 +1633,7 @@ name = "tokio-epoll-uring" version = "0.1.0" dependencies = [ "assert-panic", + "auto_enums", "futures", "io-uring 0.6.0", "nix 0.26.2", diff --git a/tokio-epoll-uring/Cargo.toml b/tokio-epoll-uring/Cargo.toml index 8e96817..3155af6 100644 --- a/tokio-epoll-uring/Cargo.toml +++ b/tokio-epoll-uring/Cargo.toml @@ -8,6 +8,7 @@ repository = "https://github.com/neondatabase/tokio-epoll-uring" license = "MIT OR Apache-2.0" [dependencies] +auto_enums = "0.8.2" futures = "0.3.28" io-uring = "0.6.0" once_cell = "1.18.0" diff --git a/tokio-epoll-uring/src/system/lifecycle/handle.rs b/tokio-epoll-uring/src/system/lifecycle/handle.rs index 9d33d51..9cf35d2 100644 --- a/tokio-epoll-uring/src/system/lifecycle/handle.rs +++ b/tokio-epoll-uring/src/system/lifecycle/handle.rs @@ -137,7 +137,7 @@ impl crate::SystemHandle { > { let op = crate::ops::nop::Nop {}; let inner = self.inner.as_ref().unwrap(); - execute_op(op, inner.submit_side.weak()) + execute_op(op, inner.submit_side.weak(), None) } pub fn read( &self, @@ -152,6 +152,6 @@ impl crate::SystemHandle { > { let op = ReadOp { file, offset, buf }; let inner = self.inner.as_ref().unwrap(); - execute_op(op, inner.submit_side.weak()) + execute_op(op, inner.submit_side.weak(), None) } } diff --git a/tokio-epoll-uring/src/system/slots.rs b/tokio-epoll-uring/src/system/slots.rs index 53951c2..0318555 100644 --- a/tokio-epoll-uring/src/system/slots.rs +++ b/tokio-epoll-uring/src/system/slots.rs @@ -33,9 +33,12 @@ use std::{ use tokio::sync::oneshot; use tracing::{debug, trace}; -use crate::system::submission::op_fut::{Error, SystemError}; +use crate::system::submission::op_fut::Error; -use super::{submission::op_fut::Op, RING_SIZE}; +use super::{ + submission::op_fut::{Op, SystemError}, + RING_SIZE, +}; pub(super) mod co_owner { pub const SUBMIT_SIDE: usize = 0; diff --git a/tokio-epoll-uring/src/system/submission.rs b/tokio-epoll-uring/src/system/submission.rs index 27c29a0..0d5c17a 100644 --- a/tokio-epoll-uring/src/system/submission.rs +++ b/tokio-epoll-uring/src/system/submission.rs @@ -3,13 +3,10 @@ pub(crate) mod op_fut; use std::sync::{Arc, Mutex, Weak}; use io_uring::{SubmissionQueue, Submitter}; -use tokio_util::either::Either; - -use crate::system::completion::ProcessCompletionsCause; use super::{ completion::CompletionSide, - slots::{self, SlotHandle, Slots, TryGetSlotResult}, + slots::{self, Slots}, }; pub(crate) struct SubmitSideNewArgs { @@ -64,6 +61,7 @@ impl SubmitSideOpen { } } +#[derive(Clone)] pub struct SubmitSideWeak(Weak>); impl SubmitSideWeak { @@ -77,37 +75,6 @@ impl SubmitSideWeak { }; SubmitSide { inner: submit_side }.with_submit_side_open(f) } - - pub(crate) async fn get_slot(&self) -> Option { - let maybe_fut = self.with_submit_side_open(|submit_side_open| match submit_side_open { - None => None, - Some(open) => match open.slots.try_get_slot() { - TryGetSlotResult::Draining => None, - TryGetSlotResult::GotSlot(slot) => Some(Either::Left(async move { Ok(slot) })), - TryGetSlotResult::NoSlots(later) => { - // All slots are taken and we're waiting in line. - // If enabled, do some opportunistic completion processing to wake up futures that will release ops slots. - // This is in the hope that we'll wake ourselves up. - - if *crate::env_tunables::PROCESS_COMPLETIONS_ON_QUEUE_FULL { - // TODO shouldn't we loop here until we've got a slot? This one-off poll doesn't make much sense. - open.submitter.submit().unwrap(); - open.completion_side - .lock() - .unwrap() - .process_completions(ProcessCompletionsCause::Regular); - } - Some(Either::Right(later)) - } - }, - }); - - if let Some(maybe_fut) = maybe_fut { - maybe_fut.await.ok() - } else { - None - } - } } impl SubmitSide { diff --git a/tokio-epoll-uring/src/system/submission/op_fut.rs b/tokio-epoll-uring/src/system/submission/op_fut.rs index 3e8f76e..da5ddad 100644 --- a/tokio-epoll-uring/src/system/submission/op_fut.rs +++ b/tokio-epoll-uring/src/system/submission/op_fut.rs @@ -12,7 +12,12 @@ pub trait Op: crate::sealed::Sealed + Sized + Send + 'static { fn make_sqe(&mut self) -> io_uring::squeue::Entry; } -use crate::system::completion::ProcessCompletionsCause; +use futures::future; + +use crate::system::{ + completion::ProcessCompletionsCause, + slots::{self, SlotHandle}, +}; use super::SubmitSideWeak; @@ -46,27 +51,21 @@ impl Display for Error { pub(crate) async fn execute_op( op: O, submit_side: SubmitSideWeak, + slot: Option, ) -> (O::Resources, Result>) where // FIXME: probably dont need the unpin O: Op + Send + 'static + Unpin, { - let slot = match submit_side.get_slot().await { - Some(slot) => slot, - None => { - let res = op.on_failed_submission(); - return (res, Err(Error::System(SystemError::SystemShuttingDown))); - } - }; - - let ret = submit_side.with_submit_side_open(|submit_side| { + let submit_side_weak = submit_side.clone(); - let submit_side: &mut super::SubmitSideOpen = match submit_side { + submit_side.with_submit_side_open(|submit_side| { + let open: &mut super::SubmitSideOpen = match submit_side { Some(submit_side) => submit_side, - None => return Err(( + None => return Fut::A(async move {( op.on_failed_submission(), Err(Error::System(SystemError::SystemShuttingDown)), - )), + )}), }; let do_submit = |submit_side: &mut super::SubmitSideOpen, sqe|{ @@ -103,12 +102,65 @@ where } }; - // We return a future here, the subsequent code is going to await it. - Ok(slot.use_for_op(op, do_submit, submit_side)) - }); + match slot { + Some(slot) => Fut::B({ + slot.use_for_op(op, do_submit, open) + }), + None => { + match open.slots.try_get_slot() { + slots::TryGetSlotResult::Draining => { + Fut::C(async move { (op.on_failed_submission(), Err(Error::System(SystemError::SystemShuttingDown)))}) + }, + slots::TryGetSlotResult::GotSlot(slot) => { + Fut::D(async move { + submit_side_weak.with_submit_side_open(|submit_side| { + let open: &mut super::SubmitSideOpen = match submit_side { + Some(submit_side) => submit_side, + None => return future::Either::Left(async move {(op.on_failed_submission(), Err(Error::System(SystemError::SystemShuttingDown)))}), + }; + future::Either::Right(slot.use_for_op(op, do_submit, open)) + }).await + }) + }, + slots::TryGetSlotResult::NoSlots(later) => { + // All slots are taken and we're waiting in line. + // If enabled, do some opportunistic completion processing to wake up futures that will release ops slots. + // This is in the hope that we'll wake ourselves up. - match ret { - Err(with_submit_side_err) => with_submit_side_err, - Ok(use_for_op_ret) => use_for_op_ret.await, - } + if *crate::env_tunables::PROCESS_COMPLETIONS_ON_QUEUE_FULL { + // TODO shouldn't we loop here until we've got a slot? This one-off poll doesn't make much sense. + open.submitter.submit().unwrap(); + open.completion_side + .lock() + .unwrap() + .process_completions(ProcessCompletionsCause::Regular); + } + Fut::E(async move { + let slot = match later.await { + Ok(slot) => slot, + Err(_dropped) => return (op.on_failed_submission(), Err(Error::System(SystemError::SystemShuttingDown))), + }; + submit_side_weak.with_submit_side_open(|submit_side| { + let open: &mut super::SubmitSideOpen = match submit_side { + Some(submit_side) => submit_side, + None => return future::Either::Left(async move {(op.on_failed_submission(), Err(Error::System(SystemError::SystemShuttingDown)))}), + }; + future::Either::Right(slot.use_for_op(op, do_submit, open)) + }).await + }) + } + } + } + } + }).await +} + +// Used by `execute_op` to avoid boxing the future returned by the `with_submit_side` closure. +#[auto_enums::enum_derive(Future)] +enum Fut { + A(A), + B(B), + C(C), + D(D), + E(E), }