Skip to content

Commit

Permalink
fix(runtime): drop runnables manually
Browse files Browse the repository at this point in the history
  • Loading branch information
Berrysoft committed Nov 25, 2024
1 parent c53a352 commit 17b65be
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 11 deletions.
22 changes: 15 additions & 7 deletions compio-runtime/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
marker::PhantomData,
panic::AssertUnwindSafe,
rc::Rc,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
Expand Down Expand Up @@ -85,9 +86,8 @@ impl RunnableQueue {
/// The async runtime of compio. It is a thread local runtime, and cannot be
/// sent to other threads.
pub struct Runtime {
// The runnable queue should live longer than the proactor.
runnables: Box<RunnableQueue>,
driver: RefCell<Proactor>,
runnables: Arc<RunnableQueue>,
#[cfg(feature = "time")]
timer_runtime: RefCell<TimerRuntime>,
event_interval: usize,
Expand All @@ -110,7 +110,7 @@ impl Runtime {
fn with_builder(builder: &RuntimeBuilder) -> io::Result<Self> {
Ok(Self {
driver: RefCell::new(builder.proactor_builder.build()?),
runnables: Box::new(RunnableQueue::new()),
runnables: Arc::new(RunnableQueue::new()),
#[cfg(feature = "time")]
timer_runtime: RefCell::new(TimerRuntime::new()),
event_interval: builder.event_interval,
Expand Down Expand Up @@ -158,16 +158,13 @@ impl Runtime {
///
/// The caller should ensure the captured lifetime long enough.
pub unsafe fn spawn_unchecked<F: Future>(&self, future: F) -> Task<F::Output> {
let runnables = self.runnables.as_ref() as *const RunnableQueue;
let runnables = self.runnables.clone();
let handle = self
.driver
.borrow()
.handle()
.expect("cannot create notify handle of the proactor");
let schedule = move |runnable| {
// The schedule closure are owned by runnables, and the runnables are owned by
// the queue. This is a self-reference.
let runnables = &*runnables;
runnables.schedule(runnable, &handle);
};
let (runnable, task) = async_task::spawn_unchecked(future, schedule);
Expand Down Expand Up @@ -378,6 +375,17 @@ impl Runtime {
}
}

impl Drop for Runtime {
fn drop(&mut self) {
self.enter(|| {
while self.runnables.sync_runnables.pop().is_some() {}
let local_runnables = unsafe { self.runnables.local_runnables.get_unchecked() };
let mut local_runnables = local_runnables.borrow_mut();
while local_runnables.pop_front().is_some() {}
})
}
}

impl AsRawFd for Runtime {
fn as_raw_fd(&self) -> RawFd {
self.driver.borrow().as_raw_fd()
Expand Down
3 changes: 1 addition & 2 deletions compio-runtime/src/runtime/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ impl<T: OpCode> Future for OpFuture<T> {
impl<T: OpCode> Drop for OpFuture<T> {
fn drop(&mut self) {
if let Some(key) = self.key.take() {
// If there's no runtime, it's OK to forget it.
Runtime::try_with_current(|r| r.cancel_op(key)).ok();
Runtime::with_current(|r| r.cancel_op(key));
}
}
}
3 changes: 1 addition & 2 deletions compio-runtime/src/runtime/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ impl Future for TimerFuture {

impl Drop for TimerFuture {
fn drop(&mut self) {
// If there's no runtime, it's OK to forget it.
Runtime::try_with_current(|r| r.cancel_timer(self.key)).ok();
Runtime::with_current(|r| r.cancel_timer(self.key));
}
}

Expand Down

0 comments on commit 17b65be

Please sign in to comment.