From 17b65be457570e18f2d0c25ab128172c397e39d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Tue, 26 Nov 2024 03:36:55 +0900 Subject: [PATCH] fix(runtime): drop runnables manually --- compio-runtime/src/runtime/mod.rs | 22 +++++++++++++++------- compio-runtime/src/runtime/op.rs | 3 +-- compio-runtime/src/runtime/time.rs | 3 +-- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/compio-runtime/src/runtime/mod.rs b/compio-runtime/src/runtime/mod.rs index 2b9975b0..9ac443e7 100644 --- a/compio-runtime/src/runtime/mod.rs +++ b/compio-runtime/src/runtime/mod.rs @@ -7,6 +7,7 @@ use std::{ marker::PhantomData, panic::AssertUnwindSafe, rc::Rc, + sync::Arc, task::{Context, Poll}, time::Duration, }; @@ -85,9 +86,8 @@ impl RunnableQueue { /// The async runtime of compio. It is a thread local runtime, and cannot be /// sent to other threads. pub struct Runtime { - // The runnable queue should live longer than the proactor. - runnables: Box, driver: RefCell, + runnables: Arc, #[cfg(feature = "time")] timer_runtime: RefCell, event_interval: usize, @@ -110,7 +110,7 @@ impl Runtime { fn with_builder(builder: &RuntimeBuilder) -> io::Result { Ok(Self { driver: RefCell::new(builder.proactor_builder.build()?), - runnables: Box::new(RunnableQueue::new()), + runnables: Arc::new(RunnableQueue::new()), #[cfg(feature = "time")] timer_runtime: RefCell::new(TimerRuntime::new()), event_interval: builder.event_interval, @@ -158,16 +158,13 @@ impl Runtime { /// /// The caller should ensure the captured lifetime long enough. pub unsafe fn spawn_unchecked(&self, future: F) -> Task { - let runnables = self.runnables.as_ref() as *const RunnableQueue; + let runnables = self.runnables.clone(); let handle = self .driver .borrow() .handle() .expect("cannot create notify handle of the proactor"); let schedule = move |runnable| { - // The schedule closure are owned by runnables, and the runnables are owned by - // the queue. This is a self-reference. - let runnables = &*runnables; runnables.schedule(runnable, &handle); }; let (runnable, task) = async_task::spawn_unchecked(future, schedule); @@ -378,6 +375,17 @@ impl Runtime { } } +impl Drop for Runtime { + fn drop(&mut self) { + self.enter(|| { + while self.runnables.sync_runnables.pop().is_some() {} + let local_runnables = unsafe { self.runnables.local_runnables.get_unchecked() }; + let mut local_runnables = local_runnables.borrow_mut(); + while local_runnables.pop_front().is_some() {} + }) + } +} + impl AsRawFd for Runtime { fn as_raw_fd(&self) -> RawFd { self.driver.borrow().as_raw_fd() diff --git a/compio-runtime/src/runtime/op.rs b/compio-runtime/src/runtime/op.rs index c14be593..bd2c3361 100644 --- a/compio-runtime/src/runtime/op.rs +++ b/compio-runtime/src/runtime/op.rs @@ -38,8 +38,7 @@ impl Future for OpFuture { impl Drop for OpFuture { fn drop(&mut self) { if let Some(key) = self.key.take() { - // If there's no runtime, it's OK to forget it. - Runtime::try_with_current(|r| r.cancel_op(key)).ok(); + Runtime::with_current(|r| r.cancel_op(key)); } } } diff --git a/compio-runtime/src/runtime/time.rs b/compio-runtime/src/runtime/time.rs index 5d8217b3..7049898f 100644 --- a/compio-runtime/src/runtime/time.rs +++ b/compio-runtime/src/runtime/time.rs @@ -144,8 +144,7 @@ impl Future for TimerFuture { impl Drop for TimerFuture { fn drop(&mut self) { - // If there's no runtime, it's OK to forget it. - Runtime::try_with_current(|r| r.cancel_timer(self.key)).ok(); + Runtime::with_current(|r| r.cancel_timer(self.key)); } }