From f20a8489b5a6e81d700e93c2eb85c6ab9e32bb2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Mon, 2 Dec 2024 20:09:21 +0900 Subject: [PATCH] fix(runtime): remove handwritten state machine --- compio-runtime/src/runtime/mod.rs | 66 +++++++++++++++--------------- compio-runtime/src/runtime/time.rs | 26 +++--------- compio-runtime/src/time.rs | 4 +- 3 files changed, 41 insertions(+), 55 deletions(-) diff --git a/compio-runtime/src/runtime/mod.rs b/compio-runtime/src/runtime/mod.rs index b164c13e..65960dbc 100644 --- a/compio-runtime/src/runtime/mod.rs +++ b/compio-runtime/src/runtime/mod.rs @@ -223,6 +223,9 @@ impl Runtime { let closure = async move { let mut op = op; loop { + // It is safe and sound to use `submit` here because the task is spawned + // immediately. + #[allow(deprecated)] match self.submit(op).await { BufResult(Ok(_), rop) => break rop.into_inner(), BufResult(Err(_), rop) => op = rop, @@ -262,7 +265,13 @@ impl Runtime { /// Submit an operation to the runtime. /// /// You only need this when authoring your own [`OpCode`]. + /// + /// It is safe to send the returned future to another runtime and poll it, + /// but the exact behavior is not guaranteed, e.g. it may return pending + /// forever or else. + #[deprecated = "use compio::runtime::submit instead"] pub fn submit(&self, op: T) -> impl Future> { + #[allow(deprecated)] self.submit_with_flags(op).map(|(res, _)| res) } @@ -272,6 +281,11 @@ impl Runtime { /// the flags /// /// You only need this when authoring your own [`OpCode`]. + /// + /// It is safe to send the returned future to another runtime and poll it, + /// but the exact behavior is not guaranteed, e.g. it may return pending + /// forever or else. + #[deprecated = "use compio::runtime::submit_with_flags instead"] pub fn submit_with_flags( &self, op: T, @@ -286,11 +300,6 @@ impl Runtime { } } - #[cfg(feature = "time")] - pub(crate) fn create_timer(&self, instant: std::time::Instant) -> impl Future { - TimerFuture::new(instant) - } - pub(crate) fn cancel_op(&self, op: Key) { self.driver.borrow_mut().cancel(op); } @@ -313,21 +322,6 @@ impl Runtime { }) } - #[cfg(feature = "time")] - pub(crate) fn register_timer( - &self, - cx: &mut Context, - instant: std::time::Instant, - ) -> Option { - let mut timer_runtime = self.timer_runtime.borrow_mut(); - if let Some(key) = timer_runtime.insert(instant) { - timer_runtime.update_waker(key, cx.waker().clone()); - Some(key) - } else { - None - } - } - #[cfg(feature = "time")] pub(crate) fn poll_timer(&self, cx: &mut Context, key: usize) -> Poll<()> { instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key); @@ -506,29 +500,37 @@ pub fn spawn_blocking( /// Submit an operation to the current runtime, and return a future for it. /// -/// It is safe but unspecified behavior to send the returned future to another -/// runtime and poll it. -/// /// ## Panics /// /// This method doesn't create runtime. It tries to obtain the current runtime /// by [`Runtime::with_current`]. -pub fn submit(op: T) -> impl Future> { - Runtime::with_current(|r| r.submit(op)) +pub async fn submit(op: T) -> BufResult { + submit_with_flags(op).await.0 } /// Submit an operation to the current runtime, and return a future for it with /// flags. /// -/// It is safe but unspecified behavior to send the returned future to another -/// runtime and poll it. -/// /// ## Panics /// /// This method doesn't create runtime. It tries to obtain the current runtime /// by [`Runtime::with_current`]. -pub fn submit_with_flags( - op: T, -) -> impl Future, u32)> { - Runtime::with_current(|r| r.submit_with_flags(op)) +pub async fn submit_with_flags(op: T) -> (BufResult, u32) { + let state = Runtime::with_current(|r| r.submit_raw(op)); + match state { + PushEntry::Pending(user_data) => OpFuture::new(user_data).await, + PushEntry::Ready(res) => { + // submit_flags won't be ready immediately, if ready, it must be error without + // flags, or the flags are not necessary + (res, 0) + } + } +} + +#[cfg(feature = "time")] +pub(crate) async fn create_timer(instant: std::time::Instant) { + let key = Runtime::with_current(|r| r.timer_runtime.borrow_mut().insert(instant)); + if let Some(key) = key { + TimerFuture::new(key).await + } } diff --git a/compio-runtime/src/runtime/time.rs b/compio-runtime/src/runtime/time.rs index 5776f869..c837957b 100644 --- a/compio-runtime/src/runtime/time.rs +++ b/compio-runtime/src/runtime/time.rs @@ -7,7 +7,6 @@ use std::{ time::{Duration, Instant}, }; -use futures_util::future::Either; use slab::Slab; use crate::runtime::Runtime; @@ -125,39 +124,26 @@ impl TimerRuntime { } pub struct TimerFuture { - key: Either, + key: usize, } impl TimerFuture { - pub fn new(instant: Instant) -> Self { - Self { - key: Either::Left(instant), - } + pub fn new(key: usize) -> Self { + Self { key } } } impl Future for TimerFuture { type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Runtime::with_current(|r| match self.key { - Either::Left(instant) => match r.register_timer(cx, instant) { - Some(key) => { - self.key = Either::Right(key); - Poll::Pending - } - None => Poll::Ready(()), - }, - Either::Right(key) => r.poll_timer(cx, key), - }) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Runtime::with_current(|r| r.poll_timer(cx, self.key)) } } impl Drop for TimerFuture { fn drop(&mut self) { - if let Either::Right(key) = self.key { - Runtime::with_current(|r| r.cancel_timer(key)); - } + Runtime::with_current(|r| r.cancel_timer(self.key)); } } diff --git a/compio-runtime/src/time.rs b/compio-runtime/src/time.rs index 73b88778..7a99dc17 100644 --- a/compio-runtime/src/time.rs +++ b/compio-runtime/src/time.rs @@ -9,8 +9,6 @@ use std::{ use futures_util::{FutureExt, select}; -use crate::Runtime; - /// Waits until `duration` has elapsed. /// /// Equivalent to [`sleep_until(Instant::now() + duration)`](sleep_until). An @@ -55,7 +53,7 @@ pub async fn sleep(duration: Duration) { /// # }) /// ``` pub async fn sleep_until(deadline: Instant) { - Runtime::with_current(|r| r.create_timer(deadline)).await + crate::runtime::create_timer(deadline).await } /// Error returned by [`timeout`] or [`timeout_at`].