Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use tokio::sync::Mutex-based guard for to keep hold of an open submit side #18

Merged
merged 19 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion tokio-epoll-uring/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ repository = "https://github.com/neondatabase/tokio-epoll-uring"
license = "MIT OR Apache-2.0"

[dependencies]
auto_enums = "0.8.2"
futures = "0.3.28"
io-uring = "0.6.0"
once_cell = "1.18.0"
Expand Down
49 changes: 30 additions & 19 deletions tokio-epoll-uring/src/system/completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ use io_uring::CompletionQueue;
use tokio::sync::{self, broadcast, mpsc, oneshot};
use tracing::{debug, info, info_span, trace, Instrument};

use crate::util::oneshot_nonconsuming;
use crate::{system::submission::SubmitSideInner, util::oneshot_nonconsuming};

use super::{
lifecycle::{ShutdownRequest, System},
slots::{self, Slots},
submission::SubmitSideOpen,
};

pub(crate) struct CompletionSide {
Expand Down Expand Up @@ -63,21 +64,20 @@ pub(crate) struct PollerNewArgs {
pub system: System,
pub(crate) slots: Slots<{ slots::co_owner::POLLER }>,
pub testing: Option<PollerTesting>,
pub shutdown_rx: oneshot_nonconsuming::Receiver<ShutdownRequest>,
}

impl Poller {
pub(crate) async fn launch(
args: PollerNewArgs,
) -> oneshot_nonconsuming::SendOnce<ShutdownRequest> {
pub(crate) fn launch(args: PollerNewArgs) -> impl std::future::Future<Output = ()> + Send {
let PollerNewArgs {
id,
uring_fd,
completion_side,
system,
slots: ops,
testing,
shutdown_rx,
} = args;
let (shutdown_tx, shutdown_rx) = oneshot_nonconsuming::channel();
let poller_task_state = Arc::new(Mutex::new(Poller {
id,
state: PollerState::RunningInTask(Arc::new(Mutex::new(PollerStateInner {
Expand All @@ -94,18 +94,19 @@ impl Poller {
poller_ready_tx,
testing,
));
poller_ready_rx
.await
// TODO make launch fallible and propagate this error
.expect("poller task must not die during startup");
shutdown_tx
async move {
poller_ready_rx
.await
// TODO make launch fallible and propagate this error
.expect("poller task must not die during startup");
}
}
}

enum PollerState {
RunningInTask(Arc<Mutex<PollerStateInner>>),
RunningInThread(Arc<Mutex<PollerStateInner>>),
ShuttingDownPreemptible(Arc<Mutex<PollerStateInner>>, Arc<ShutdownRequest>),
ShuttingDownPreemptible(Arc<Mutex<PollerStateInner>>, Arc<ShutdownRequestImpl>),
ShuttingDownNoMorePreemptible,
ShutDown,
}
Expand Down Expand Up @@ -293,9 +294,8 @@ async fn poller_impl(
};
let shutdown_req_shared = match maybe_shutdown_req_shared {
None => {
let shutdown_req: ShutdownRequest = tokio::select! {
req = poller_impl_impl(Arc::clone(&inner_shared), preempt_in_epoll) => { req },
};
let shutdown_req: ShutdownRequestImpl =
poller_impl_impl(Arc::clone(&inner_shared), preempt_in_epoll).await;
let shared = Arc::new(shutdown_req);
poller.lock().unwrap().state = PollerState::ShuttingDownPreemptible(
Arc::clone(&inner_shared),
Expand All @@ -319,12 +319,13 @@ async fn poller_impl(
}

// 1. Prevent new ops from being submitted and wait for all inflight ops to finish.
// `SystemHandleInner::shutdown` already plugged the sumit side & transitioned Ops to `Draining` state.
// `poller_impl_impl` already transitioned `slots` to `Draining` state.
// So, all that's left is to wait for pending count to reach 0.
loop {
{
let inner_guard = inner_shared.lock().unwrap();
let mut completion_side_guard = inner_guard.completion_side.lock().unwrap();
completion_side_guard.slots.transition_to_draining();
let pending_count = completion_side_guard.slots.pending_slot_count();
debug!(pending_count, "waiting for pending operations to complete");
if pending_count == 0 {
Expand Down Expand Up @@ -393,10 +394,15 @@ async fn poller_impl(
})()
}

pub(crate) struct ShutdownRequestImpl {
pub(crate) done_tx: Option<tokio::sync::oneshot::Sender<()>>,
pub(crate) submit_side_open: SubmitSideOpen,
}

async fn poller_impl_impl(
inner: Arc<Mutex<PollerStateInner>>,
mut preempt_in_epoll: Option<tokio::sync::broadcast::Receiver<mpsc::UnboundedSender<()>>>,
) -> ShutdownRequest {
) -> ShutdownRequestImpl {
let (uring_fd, completion_side, mut shutdown_rx) = {
let mut inner_guard = inner.lock().unwrap();
let PollerStateInner {
Expand Down Expand Up @@ -436,9 +442,14 @@ async fn poller_impl_impl(
}
rx = shutdown_rx.recv() => {
match rx {
crate::util::oneshot_nonconsuming::RecvResult::FirstRecv(req) => {
crate::util::oneshot_nonconsuming::RecvResult::FirstRecv(ShutdownRequest { done_tx, submit_side_inner }) => {
tracing::debug!("got explicit shutdown request");
return req;
let mut inner = submit_side_inner.lock().await;
let open = match std::mem::replace(&mut *inner, SubmitSideInner::ShutDownInitiated) {
SubmitSideInner::Open(open) => open,
SubmitSideInner::ShutDownInitiated => unreachable!("poller_impl transitions to state ShuttingDownPreemptible when we return a shutdown request, so, it won't call poller_impl_impl again"),
};
return ShutdownRequestImpl { done_tx, submit_side_open: open };
}
crate::util::oneshot_nonconsuming::RecvResult::NotFirstRecv => {
panic!("once we observe a shutdown request, we return it and the caller does through with shutdown, without a chance for the executor to intervene")
Expand Down Expand Up @@ -561,7 +572,7 @@ mod tests {
.build()
.unwrap();
second_rt.block_on(async move {
let mut shutdown_done_fut = shutdown_done_fut;
let mut shutdown_done_fut = Box::pin(shutdown_done_fut);
tokio::select! {
// TODO don't rely on timing
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => { }
Expand Down
39 changes: 23 additions & 16 deletions tokio-epoll-uring/src/system/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ pub mod thread_local;

use io_uring::{CompletionQueue, SubmissionQueue, Submitter};

use crate::system::{submission::SubmitSideOpen, RING_SIZE};
use crate::{
system::{completion::ShutdownRequestImpl, RING_SIZE},
util::oneshot_nonconsuming,
};

use super::{
completion::{CompletionSide, Poller, PollerNewArgs, PollerTesting},
lifecycle::handle::SystemHandle,
slots::{self, Slots},
submission::{SubmitSide, SubmitSideNewArgs},
submission::{SubmitSide, SubmitSideInner, SubmitSideNewArgs},
};

/// A running `tokio_epoll_uring` system. Use [`Self::launch`] to start, then [`SystemHandle`] to interact.
Expand Down Expand Up @@ -59,12 +62,15 @@ impl System {
slots_completion_side,
)));

let (shutdown_tx, shutdown_rx) = oneshot_nonconsuming::channel();

let submit_side = SubmitSide::new(SubmitSideNewArgs {
id,
submitter,
sq,
slots: slots_submit_side,
completion_side: Arc::clone(&completion_side),
shutdown_tx,
});
let system = System {
id,
Expand All @@ -77,42 +83,41 @@ impl System {
system,
slots: slots_poller,
testing,
shutdown_rx,
});
(submit_side, poller_ready_fut)
};

let poller_shutdown_tx = poller_ready_fut.await;
poller_ready_fut.await;

SystemHandle::new(id, submit_side, poller_shutdown_tx)
SystemHandle::new(id, submit_side)
}
}

pub(crate) struct ShutdownRequest {
pub done_tx: tokio::sync::oneshot::Sender<()>,
pub open_state: SubmitSideOpen,
pub done_tx: Option<tokio::sync::oneshot::Sender<()>>,
pub submit_side_inner: Arc<tokio::sync::Mutex<SubmitSideInner>>,
}

pub(crate) fn poller_impl_finish_shutdown(
system: System,
ops: Slots<{ slots::co_owner::POLLER }>,
completion_side: Arc<Mutex<CompletionSide>>,
req: ShutdownRequest,
shutdown_request: ShutdownRequestImpl,
) {
tracing::info!("poller shutdown start");
scopeguard::defer_on_success! {tracing::info!("poller shutdown end")};
scopeguard::defer_on_unwind! {tracing::error!("poller shutdown panic")};

let System { id: _, split_uring } = { system };

let ShutdownRequest {
done_tx,
open_state,
} = req;
let ShutdownRequestImpl {
mut done_tx,
submit_side_open,
} = { shutdown_request };

let (submitter, sq) = open_state.deconstruct();
let completion_side = Arc::try_unwrap(completion_side)
.ok()
.expect("we plugged the SubmitSide, so, all refs to CompletionSide are gone");
let (submitter, sq) = submit_side_open.deconstruct();
let completion_side = Arc::try_unwrap(completion_side).ok().unwrap();
let completion_side = Mutex::into_inner(completion_side).unwrap();

// Unsplit the uring
Expand Down Expand Up @@ -154,5 +159,7 @@ pub(crate) fn poller_impl_finish_shutdown(

// notify about completed shutdown;
// ignore send errors, interest may be gone if it's implicit shutdown through SystemHandle::drop
let _ = done_tx.send(());
if let Some(done_tx) = done_tx.take() {
let _ = done_tx.send(());
}
}
45 changes: 5 additions & 40 deletions tokio-epoll-uring/src/system/lifecycle/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ use crate::{
system::submission::{op_fut::execute_op, SubmitSide},
};

use super::ShutdownRequest;

/// Owned handle to the [`System`](crate::System) created by [`System::launch`](crate::System::launch).
///
/// The only use of this handle is to shut down the [`System`](crate::System).
Expand All @@ -28,31 +26,12 @@ struct SystemHandleInner {
#[allow(dead_code)]
pub(super) id: usize,
pub(crate) submit_side: SubmitSide,
pub(super) shutdown_tx: crate::util::oneshot_nonconsuming::SendOnce<ShutdownRequest>,
}

impl Drop for SystemHandle {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
let wait_shutdown_done = inner.shutdown();
// we don't care about the result
drop(wait_shutdown_done);
}
}
}

impl SystemHandle {
pub(crate) fn new(
id: usize,
submit_side: SubmitSide,
shutdown_tx: crate::util::oneshot_nonconsuming::SendOnce<ShutdownRequest>,
) -> Self {
pub(crate) fn new(id: usize, submit_side: SubmitSide) -> Self {
SystemHandle {
inner: Some(SystemHandleInner {
id,
submit_side,
shutdown_tx,
}),
inner: Some(SystemHandleInner { id, submit_side }),
}
}

Expand All @@ -78,7 +57,7 @@ impl SystemHandle {
/// the shutdown procedure makes sure to continue in a new `std::thread`.
///
/// So, it is safe to drop the tokio runtime on which the poller task runs.
pub fn initiate_shutdown(mut self) -> impl std::future::Future<Output = ()> + Send + Unpin {
pub fn initiate_shutdown(mut self) -> impl std::future::Future<Output = ()> + Send {
let inner = self
.inner
.take()
Expand Down Expand Up @@ -107,22 +86,8 @@ impl std::future::Future for WaitShutdownFut {
}

impl SystemHandleInner {
fn shutdown(self) -> impl std::future::Future<Output = ()> + Send + Unpin {
let SystemHandleInner {
id: _,
submit_side,
shutdown_tx,
} = self;
let (done_tx, done_rx) = tokio::sync::oneshot::channel();
let req = ShutdownRequest {
done_tx,
open_state: submit_side.plug(),
};
shutdown_tx
.send(req)
.ok()
.expect("implementation error: poller task must not die before SystemHandle");
WaitShutdownFut { done_rx }
fn shutdown(self) -> impl std::future::Future<Output = ()> + Send {
self.submit_side.shutdown()
}
}

Expand Down
Loading
Loading