From 52b34846efe2c883488b2f9ee015f98a15ce97cc Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 31 Jan 2024 18:19:52 +0000 Subject: [PATCH] fix: dropping op future at yield_now() after it completed causes panic --- tokio-epoll-uring/src/system/slots.rs | 94 ++++++++++++--------------- 1 file changed, 40 insertions(+), 54 deletions(-) diff --git a/tokio-epoll-uring/src/system/slots.rs b/tokio-epoll-uring/src/system/slots.rs index 52b5200..ad5a875 100644 --- a/tokio-epoll-uring/src/system/slots.rs +++ b/tokio-epoll-uring/src/system/slots.rs @@ -460,6 +460,8 @@ impl SlotHandle { op: O, ) -> (O::Resources, Result>) { let slot = self; + + // invariant: op.is_some() <=> we haven't observed the poll_fn below complete yet let op = std::sync::Mutex::new(Some(op)); // If this future gets dropped _before_ the op completes, we need to make sure @@ -481,7 +483,10 @@ impl SlotHandle { }; let storage = &mut inner.storage; let slot_storage_mut = &mut storage[slot.idx]; - let slot_mut = slot_storage_mut.as_mut().unwrap(); + // the invariant is: `op.is_some() <=> ` + let slot_mut = slot_storage_mut + .as_mut() + .expect("op is Some(), so the poll_fn below hasn't returned the slot yet"); match &mut *slot_mut { Slot::Pending { .. } => { // The resource needs to be kept alive until the op completes. @@ -530,16 +535,15 @@ impl SlotHandle { // Now that we've set up the scope guard, get to business. // Inspect the slot to check whether the poller task already processed the completion. // If it has, good for us. - // If not, set up a oneshot to notify us. (TODO: in the hand-rolled futures, this was simply a std::task::Waker, now it's a oneshot.) - enum InspectSlotResult { - AlreadyDone(i32), - NeedToWait, - ShutDown, - } + // If not, store a waker in the slot so the poller task will wake us up to poll again + // and observe the Slot::Ready then. + // + // If we get cancelled in the meantime (i.e., this future gets dropped), the scopeguard + // will make sure the resources stay alive until the op is complete. let mut poll_count = 0; - let inspect_slot_res = poll_fn(|cx| { + let poll_res = poll_fn(|cx| { poll_count += 1; - let inspect_slot_res = slot.slots_weak.try_upgrade_mut(move |inner| { + let try_upgrade_res = slot.slots_weak.try_upgrade_mut(|inner| { let storage = &mut inner.storage; let slot_storage_ref = &mut storage[slot.idx]; let slot_mut = slot_storage_ref.as_mut().unwrap(); @@ -551,7 +555,7 @@ impl SlotHandle { if !cx.waker().will_wake(waker_mut_ref) { waker.replace(cx.waker().clone()); } - InspectSlotResult::NeedToWait + Poll::Pending } Slot::PendingButFutureDropped { .. } => { unreachable!("if it's dropped, it's not pollable") @@ -560,26 +564,38 @@ impl SlotHandle { trace!("op is ready, returning resources to user"); let res = *res; inner.return_slot(slot.idx); - InspectSlotResult::AlreadyDone(res) + // SAFETY: the slot is ready, so, ownership is back with userspace. + #[allow(unused_unsafe)] + unsafe { + let op = op.lock().unwrap().take().unwrap(); + Poll::Ready(op.on_op_completion(res)) + } } } }); - let inspect_slot_res = match inspect_slot_res { - Err(()) => InspectSlotResult::ShutDown, - Ok(res) => res, - }; - match inspect_slot_res { - InspectSlotResult::NeedToWait => Poll::Pending, - x => Poll::Ready(x), + match try_upgrade_res { + Err(()) => { + // SAFETY: + // This future has an outdated view of the system; it shut down in the meantime. + // Shutdown makes sure that all inflight ops complete, so, + // these resources are no longer owned by the kernel and can be returned as an error. + #[allow(unused_unsafe)] + unsafe { + let op = op.lock().unwrap().take().unwrap(); + Poll::Ready(( + op.on_failed_submission(), + Err(Error::System(SystemError::SystemShuttingDown)), + )) + } + } + Ok(Poll::Ready((resources, res))) => { + Poll::Ready((resources, res.map_err(Error::Op))) + } + Ok(Poll::Pending) => Poll::Pending, } }) .await; assert!(poll_count >= 1); - assert!( - !matches!(inspect_slot_res, InspectSlotResult::NeedToWait), - "poll_fn closure returns Pending in that case" - ); - #[cfg(test)] { let on_wake = { slot.test_on_wake.lock().unwrap().take() }; @@ -589,40 +605,10 @@ impl SlotHandle { rx.await.unwrap(); } } - - let res = match inspect_slot_res { - InspectSlotResult::AlreadyDone(r) => r, - InspectSlotResult::NeedToWait => { - unreachable!() - } - InspectSlotResult::ShutDown => { - // SAFETY: - // This future has an outdated view of the system; it shut down in the meantime. - // Shutdown makes sure that all inflight ops complete, so, - // these resources are no longer owned by the kernel and can be returned as an error. - #[allow(unused_unsafe)] - unsafe { - let op = op.lock().unwrap().take().unwrap(); - return ( - op.on_failed_submission(), - Err(Error::System(SystemError::SystemShuttingDown)), - ); - } - } - }; - if poll_count == 1 && *crate::env_tunables::YIELD_TO_EXECUTOR_IF_READY_ON_FIRST_POLL { tokio::task::yield_now().await; } - - // SAFETY: - // We got a result, so, kernel is done with the operation and ownership is back with us. - #[allow(unused_unsafe)] - let (resources, res) = unsafe { - let op = op.lock().unwrap().take().expect("we only take() it in drop(), and evidently drop() hasn't happened yet because we're executing a method on self"); - op.on_op_completion(res) - }; - (resources, res.map_err(Error::Op)) + poll_res } }