Skip to content

Commit

Permalink
repro/regress-test the hypothesized root cause
Browse files Browse the repository at this point in the history
  • Loading branch information
problame committed Jan 31, 2024
1 parent 88a29e1 commit fc7f84b
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 6 deletions.
4 changes: 3 additions & 1 deletion tokio-epoll-uring/src/system/completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,9 @@ mod tests {
let (read_task_jh, mut writer) = rt.block_on(async move {
let (reader, writer) = os_pipe::pipe().unwrap();
let jh = tokio::spawn(async move {
let system = System::launch_with_testing(Some(testing)).await.unwrap();
let system = System::launch_with_testing(Some(testing), None)
.await
.unwrap();
let reader =
unsafe { OwnedFd::from_raw_fd(nix::unistd::dup(reader.as_raw_fd()).unwrap()) };
let buf = vec![0; 1];
Expand Down
12 changes: 8 additions & 4 deletions tokio-epoll-uring/src/system/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use super::{
submission::{SubmitSide, SubmitSideInner, SubmitSideNewArgs},
};

use slots::SlotsTesting;

/// A running `tokio_epoll_uring` system. Use [`Self::launch`] to start, then [`SystemHandle`] to interact.
pub struct System {
#[allow(dead_code)]
Expand Down Expand Up @@ -70,17 +72,19 @@ impl System {
///
/// The concept of *poller task* is described in [`crate::doc::design`].
pub async fn launch() -> Result<SystemHandle, LaunchResult> {
Self::launch_with_testing(None).await
Self::launch_with_testing(None, None).await
}

pub(crate) async fn launch_with_testing(
testing: Option<PollerTesting>,
poller_testing: Option<PollerTesting>,
slots_testing: Option<SlotsTesting>,
) -> Result<SystemHandle, LaunchResult> {
let id = SYSTEM_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

let (submit_side, poller_ready_fut) = {
// TODO: should we mlock `slots`? io_uring mmap is mlocked, slots are equally important for the system to function;
let (slots_submit_side, slots_completion_side, slots_poller) = super::slots::new(id);
let (slots_submit_side, slots_completion_side, slots_poller) =
super::slots::new(id, slots_testing.unwrap_or_default());

let uring = Box::new(
io_uring::IoUring::builder()
Expand Down Expand Up @@ -171,7 +175,7 @@ impl System {
completion_side,
system,
slots: slots_poller,
testing,
testing: poller_testing,
shutdown_rx,
});
(submit_side, poller_ready_fut)
Expand Down
84 changes: 84 additions & 0 deletions tokio-epoll-uring/src/system/slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,30 @@ struct SlotsInner {
unused_indices: Vec<usize>,
co_owner_live: [bool; co_owner::NUM_CO_OWNERS],
state: SlotsInnerState,
#[cfg(test)]
testing: SlotsTesting,
}

#[cfg(test)]
pub(crate) struct SlotsTesting {
pub(crate) test_on_wake: Box<
dyn Send
+ Sync
+ Fn() -> Option<tokio::sync::oneshot::Sender<tokio::sync::oneshot::Sender<()>>>,
>,
}

#[cfg(not(test))]
#[derive(Default)]
pub(crate) struct SlotsTesting;

#[cfg(test)]
impl Default for SlotsTesting {
fn default() -> Self {
Self {
test_on_wake: Box::new(|| None),
}
}
}

enum SlotsInnerState {
Expand All @@ -84,6 +108,9 @@ pub(crate) struct SlotHandle {
// FIXME: why is this weak?
slots_weak: SlotsWeak,
idx: usize,
#[cfg(test)]
test_on_wake:
std::sync::Mutex<Option<tokio::sync::oneshot::Sender<tokio::sync::oneshot::Sender<()>>>>,
}

enum Slot {
Expand All @@ -101,6 +128,7 @@ enum Slot {

pub(super) fn new(
id: usize,
#[allow(unused_variables)] testing: SlotsTesting,
) -> (
Slots<{ co_owner::SUBMIT_SIDE }>,
Slots<{ co_owner::COMPLETION_SIDE }>,
Expand All @@ -122,6 +150,8 @@ pub(super) fn new(
inner_weak: inner_weak.clone(),
},
},
#[cfg(test)]
testing,
})
});
fn make_co_owner<const O: usize>(inner: &Arc<Mutex<SlotsInner>>) -> Slots<O> {
Expand Down Expand Up @@ -192,6 +222,8 @@ impl SlotsInner {
match waiter.send(SlotHandle {
slots_weak: myself.clone(),
idx,
#[cfg(test)]
test_on_wake: Mutex::new((self.testing.test_on_wake)()),
}) {
Ok(()) => {
trace!("handed `idx` to a waiter");
Expand Down Expand Up @@ -373,6 +405,8 @@ impl Slots<{ co_owner::SUBMIT_SIDE }> {
SlotHandle {
slots_weak: myself.clone(),
idx,
#[cfg(test)]
test_on_wake: Mutex::new((inner.testing.test_on_wake)()),
}
}),
None => {
Expand Down Expand Up @@ -546,6 +580,16 @@ impl SlotHandle {
"poll_fn closure returns Pending in that case"
);

#[cfg(test)]
{
let on_wake = { slot.test_on_wake.lock().unwrap().take() };
if let Some(on_wake) = on_wake {
let (tx, rx) = tokio::sync::oneshot::channel();
on_wake.send(tx).unwrap();
rx.await.unwrap();
}
}

let res = match inspect_slot_res {
InspectSlotResult::AlreadyDone(r) => r,
InspectSlotResult::NeedToWait => {
Expand Down Expand Up @@ -607,3 +651,43 @@ impl Slot {
}
}
}

#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};

use crate::{system::slots::SlotsTesting, System};

// Regression-test for issue https://github.com/neondatabase/tokio-epoll-uring/issues/37
#[tokio::test]
async fn test_wait_for_completion_drop_behavior() {
let (tx, rx) = tokio::sync::oneshot::channel();
let tx = Arc::new(Mutex::new(Some(tx)));
let system = System::launch_with_testing(
None,
Some(SlotsTesting {
test_on_wake: Box::new(move || {
Some(
tx.lock()
.unwrap()
.take()
.expect("should only be called once, we only submit one nop here"),
)
}),
}),
)
.await
.unwrap();
let nop = tokio::spawn(system.nop());
let at_yield_point: tokio::sync::oneshot::Sender<()> = rx.await.unwrap();
nop.abort();
let Err(join_err) = nop.await else {
panic!("expecting join error after abort");
};
assert!(join_err.is_cancelled());
assert!(
at_yield_point.is_closed(),
"abort drops the nop op, and hence the oneshot receiver"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl SharedSystemHandle {
pub(crate) async fn launch_with_testing(
poller_testing: Option<PollerTesting>,
) -> Result<Self, LaunchResult> {
let handle = System::launch_with_testing(poller_testing).await?;
let handle = System::launch_with_testing(poller_testing, None).await?;
Ok(Self(Arc::new(RwLock::new(Some(handle)))))
}

Expand Down

0 comments on commit fc7f84b

Please sign in to comment.