Skip to content

Commit

Permalink
fix: dropping op future at yield_now() after it completed causes panic
Browse files Browse the repository at this point in the history
  • Loading branch information
problame committed Jan 31, 2024
1 parent d9a158a commit 3a71262
Showing 1 changed file with 40 additions and 54 deletions.
94 changes: 40 additions & 54 deletions tokio-epoll-uring/src/system/slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,8 @@ impl SlotHandle {
op: O,
) -> (O::Resources, Result<O::Success, Error<O::Error>>) {
let slot = self;

// invariant: op.is_some() <=> we haven't observed the poll_fn below complete yet
let op = std::sync::Mutex::new(Some(op));

// If this future gets dropped _before_ the op completes, we need to make sure
Expand All @@ -481,7 +483,10 @@ impl SlotHandle {
};
let storage = &mut inner.storage;
let slot_storage_mut = &mut storage[slot.idx];
let slot_mut = slot_storage_mut.as_mut().unwrap();
// the invariant is: `op.is_some() <=> `
let slot_mut = slot_storage_mut
.as_mut()
.expect("op is Some(), so the poll_fn below hasn't returned the slot yet");
match &mut *slot_mut {
Slot::Pending { .. } => {
// The resource needs to be kept alive until the op completes.
Expand Down Expand Up @@ -530,16 +535,15 @@ impl SlotHandle {
// Now that we've set up the scope guard, get to business.
// Inspect the slot to check whether the poller task already processed the completion.
// If it has, good for us.
// If not, set up a oneshot to notify us. (TODO: in the hand-rolled futures, this was simply a std::task::Waker, now it's a oneshot.)
enum InspectSlotResult {
AlreadyDone(i32),
NeedToWait,
ShutDown,
}
// If not, store a waker in the slot so the poller task will wake us up to poll again
// and observe the Slot::Ready then.
//
// If we get cancelled in the meantime (i.e., this future gets dropped), the scopeguard
// will make sure the resources stay alive until the op is complete.
let mut poll_count = 0;
let inspect_slot_res = poll_fn(|cx| {
let poll_res = poll_fn(|cx| {
poll_count += 1;
let inspect_slot_res = slot.slots_weak.try_upgrade_mut(move |inner| {
let try_upgrade_res = slot.slots_weak.try_upgrade_mut(|inner| {
let storage = &mut inner.storage;
let slot_storage_ref = &mut storage[slot.idx];
let slot_mut = slot_storage_ref.as_mut().unwrap();
Expand All @@ -551,7 +555,7 @@ impl SlotHandle {
if !cx.waker().will_wake(waker_mut_ref) {
waker.replace(cx.waker().clone());
}
InspectSlotResult::NeedToWait
Poll::Pending
}
Slot::PendingButFutureDropped { .. } => {
unreachable!("if it's dropped, it's not pollable")
Expand All @@ -560,26 +564,38 @@ impl SlotHandle {
trace!("op is ready, returning resources to user");
let res = *res;
inner.return_slot(slot.idx);
InspectSlotResult::AlreadyDone(res)
// SAFETY: the slot is ready, so, ownership is back with userspace.
#[allow(unused_unsafe)]
unsafe {
let op = op.lock().unwrap().take().unwrap();
Poll::Ready(op.on_op_completion(res))
}
}
}
});
let inspect_slot_res = match inspect_slot_res {
Err(()) => InspectSlotResult::ShutDown,
Ok(res) => res,
};
match inspect_slot_res {
InspectSlotResult::NeedToWait => Poll::Pending,
x => Poll::Ready(x),
match try_upgrade_res {
Err(()) => {
// SAFETY:
// This future has an outdated view of the system; it shut down in the meantime.
// Shutdown makes sure that all inflight ops complete, so,
// these resources are no longer owned by the kernel and can be returned as an error.
#[allow(unused_unsafe)]
unsafe {
let op = op.lock().unwrap().take().unwrap();
Poll::Ready((
op.on_failed_submission(),
Err(Error::System(SystemError::SystemShuttingDown)),
))
}
}
Ok(Poll::Ready((resources, res))) => {
Poll::Ready((resources, res.map_err(Error::Op)))
}
Ok(Poll::Pending) => Poll::Pending,
}
})
.await;
assert!(poll_count >= 1);
assert!(
!matches!(inspect_slot_res, InspectSlotResult::NeedToWait),
"poll_fn closure returns Pending in that case"
);

#[cfg(test)]
{
let on_wake = { slot.test_on_wake.lock().unwrap().take() };
Expand All @@ -589,40 +605,10 @@ impl SlotHandle {
rx.await.unwrap();
}
}

let res = match inspect_slot_res {
InspectSlotResult::AlreadyDone(r) => r,
InspectSlotResult::NeedToWait => {
unreachable!()
}
InspectSlotResult::ShutDown => {
// SAFETY:
// This future has an outdated view of the system; it shut down in the meantime.
// Shutdown makes sure that all inflight ops complete, so,
// these resources are no longer owned by the kernel and can be returned as an error.
#[allow(unused_unsafe)]
unsafe {
let op = op.lock().unwrap().take().unwrap();
return (
op.on_failed_submission(),
Err(Error::System(SystemError::SystemShuttingDown)),
);
}
}
};

if poll_count == 1 && *crate::env_tunables::YIELD_TO_EXECUTOR_IF_READY_ON_FIRST_POLL {
tokio::task::yield_now().await;
}

// SAFETY:
// We got a result, so, kernel is done with the operation and ownership is back with us.
#[allow(unused_unsafe)]
let (resources, res) = unsafe {
let op = op.lock().unwrap().take().expect("we only take() it in drop(), and evidently drop() hasn't happened yet because we're executing a method on self");
op.on_op_completion(res)
};
(resources, res.map_err(Error::Op))
poll_res
}
}

Expand Down

0 comments on commit 3a71262

Please sign in to comment.