Skip to content

Commit

Permalink
eliminate use_slot() and thereby avoid double `with_submit_side_ope…
Browse files Browse the repository at this point in the history
…n` (#17)
  • Loading branch information
problame authored Sep 1, 2023
1 parent 8110455 commit f0c3bba
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 59 deletions.
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tokio-epoll-uring/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ repository = "https://github.com/neondatabase/tokio-epoll-uring"
license = "MIT OR Apache-2.0"

[dependencies]
auto_enums = "0.8.2"
futures = "0.3.28"
io-uring = "0.6.0"
once_cell = "1.18.0"
Expand Down
4 changes: 2 additions & 2 deletions tokio-epoll-uring/src/system/lifecycle/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl crate::SystemHandle {
> {
let op = crate::ops::nop::Nop {};
let inner = self.inner.as_ref().unwrap();
execute_op(op, inner.submit_side.weak())
execute_op(op, inner.submit_side.weak(), None)
}
pub fn read<B: IoBufMut + Send>(
&self,
Expand All @@ -152,6 +152,6 @@ impl crate::SystemHandle {
> {
let op = ReadOp { file, offset, buf };
let inner = self.inner.as_ref().unwrap();
execute_op(op, inner.submit_side.weak())
execute_op(op, inner.submit_side.weak(), None)
}
}
7 changes: 5 additions & 2 deletions tokio-epoll-uring/src/system/slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ use std::{
use tokio::sync::oneshot;
use tracing::{debug, trace};

use crate::system::submission::op_fut::{Error, SystemError};
use crate::system::submission::op_fut::Error;

use super::{submission::op_fut::Op, RING_SIZE};
use super::{
submission::op_fut::{Op, SystemError},
RING_SIZE,
};

pub(super) mod co_owner {
pub const SUBMIT_SIDE: usize = 0;
Expand Down
37 changes: 2 additions & 35 deletions tokio-epoll-uring/src/system/submission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ pub(crate) mod op_fut;
use std::sync::{Arc, Mutex, Weak};

use io_uring::{SubmissionQueue, Submitter};
use tokio_util::either::Either;

use crate::system::completion::ProcessCompletionsCause;

use super::{
completion::CompletionSide,
slots::{self, SlotHandle, Slots, TryGetSlotResult},
slots::{self, Slots},
};

pub(crate) struct SubmitSideNewArgs {
Expand Down Expand Up @@ -64,6 +61,7 @@ impl SubmitSideOpen {
}
}

#[derive(Clone)]
pub struct SubmitSideWeak(Weak<Mutex<SubmitSideInner>>);

impl SubmitSideWeak {
Expand All @@ -77,37 +75,6 @@ impl SubmitSideWeak {
};
SubmitSide { inner: submit_side }.with_submit_side_open(f)
}

pub(crate) async fn get_slot(&self) -> Option<SlotHandle> {
let maybe_fut = self.with_submit_side_open(|submit_side_open| match submit_side_open {
None => None,
Some(open) => match open.slots.try_get_slot() {
TryGetSlotResult::Draining => None,
TryGetSlotResult::GotSlot(slot) => Some(Either::Left(async move { Ok(slot) })),
TryGetSlotResult::NoSlots(later) => {
// All slots are taken and we're waiting in line.
// If enabled, do some opportunistic completion processing to wake up futures that will release ops slots.
// This is in the hope that we'll wake ourselves up.

if *crate::env_tunables::PROCESS_COMPLETIONS_ON_QUEUE_FULL {
// TODO shouldn't we loop here until we've got a slot? This one-off poll doesn't make much sense.
open.submitter.submit().unwrap();
open.completion_side
.lock()
.unwrap()
.process_completions(ProcessCompletionsCause::Regular);
}
Some(Either::Right(later))
}
},
});

if let Some(maybe_fut) = maybe_fut {
maybe_fut.await.ok()
} else {
None
}
}
}

impl SubmitSide {
Expand Down
92 changes: 72 additions & 20 deletions tokio-epoll-uring/src/system/submission/op_fut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ pub trait Op: crate::sealed::Sealed + Sized + Send + 'static {
fn make_sqe(&mut self) -> io_uring::squeue::Entry;
}

use crate::system::completion::ProcessCompletionsCause;
use futures::future;

use crate::system::{
completion::ProcessCompletionsCause,
slots::{self, SlotHandle},
};

use super::SubmitSideWeak;

Expand Down Expand Up @@ -46,27 +51,21 @@ impl<T: Display> Display for Error<T> {
pub(crate) async fn execute_op<O>(
op: O,
submit_side: SubmitSideWeak,
slot: Option<SlotHandle>,
) -> (O::Resources, Result<O::Success, Error<O::Error>>)
where
// FIXME: probably dont need the unpin
O: Op + Send + 'static + Unpin,
{
let slot = match submit_side.get_slot().await {
Some(slot) => slot,
None => {
let res = op.on_failed_submission();
return (res, Err(Error::System(SystemError::SystemShuttingDown)));
}
};

let ret = submit_side.with_submit_side_open(|submit_side| {
let submit_side_weak = submit_side.clone();

let submit_side: &mut super::SubmitSideOpen = match submit_side {
submit_side.with_submit_side_open(|submit_side| {
let open: &mut super::SubmitSideOpen = match submit_side {
Some(submit_side) => submit_side,
None => return Err((
None => return Fut::A(async move {(
op.on_failed_submission(),
Err(Error::System(SystemError::SystemShuttingDown)),
)),
)}),
};

let do_submit = |submit_side: &mut super::SubmitSideOpen, sqe|{
Expand Down Expand Up @@ -103,12 +102,65 @@ where
}
};

// We return a future here, the subsequent code is going to await it.
Ok(slot.use_for_op(op, do_submit, submit_side))
});
match slot {
Some(slot) => Fut::B({
slot.use_for_op(op, do_submit, open)
}),
None => {
match open.slots.try_get_slot() {
slots::TryGetSlotResult::Draining => {
Fut::C(async move { (op.on_failed_submission(), Err(Error::System(SystemError::SystemShuttingDown)))})
},
slots::TryGetSlotResult::GotSlot(slot) => {
Fut::D(async move {
submit_side_weak.with_submit_side_open(|submit_side| {
let open: &mut super::SubmitSideOpen = match submit_side {
Some(submit_side) => submit_side,
None => return future::Either::Left(async move {(op.on_failed_submission(), Err(Error::System(SystemError::SystemShuttingDown)))}),
};
future::Either::Right(slot.use_for_op(op, do_submit, open))
}).await
})
},
slots::TryGetSlotResult::NoSlots(later) => {
// All slots are taken and we're waiting in line.
// If enabled, do some opportunistic completion processing to wake up futures that will release ops slots.
// This is in the hope that we'll wake ourselves up.

match ret {
Err(with_submit_side_err) => with_submit_side_err,
Ok(use_for_op_ret) => use_for_op_ret.await,
}
if *crate::env_tunables::PROCESS_COMPLETIONS_ON_QUEUE_FULL {
// TODO shouldn't we loop here until we've got a slot? This one-off poll doesn't make much sense.
open.submitter.submit().unwrap();
open.completion_side
.lock()
.unwrap()
.process_completions(ProcessCompletionsCause::Regular);
}
Fut::E(async move {
let slot = match later.await {
Ok(slot) => slot,
Err(_dropped) => return (op.on_failed_submission(), Err(Error::System(SystemError::SystemShuttingDown))),
};
submit_side_weak.with_submit_side_open(|submit_side| {
let open: &mut super::SubmitSideOpen = match submit_side {
Some(submit_side) => submit_side,
None => return future::Either::Left(async move {(op.on_failed_submission(), Err(Error::System(SystemError::SystemShuttingDown)))}),
};
future::Either::Right(slot.use_for_op(op, do_submit, open))
}).await
})
}
}
}
}
}).await
}

// Used by `execute_op` to avoid boxing the future returned by the `with_submit_side` closure.
#[auto_enums::enum_derive(Future)]
enum Fut<A, B, C, D, E> {
A(A),
B(B),
C(C),
D(D),
E(E),
}

0 comments on commit f0c3bba

Please sign in to comment.