Skip to content

Commit

Permalink
fix(runtime): remove handwritten state machine (#361)
Browse files Browse the repository at this point in the history
* fix(runtime): remove handwritten state machine

* ci(check): pin ubuntu version to 22.04
  • Loading branch information
Berrysoft authored Dec 20, 2024
1 parent 8b1937a commit 7ff681e
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 57 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-latest, windows-latest, macos-latest ]
os: [ ubuntu-22.04, windows-latest, macos-latest ]
steps:
- uses: actions/checkout@v4
- name: Setup Rust Toolchain
Expand Down
70 changes: 37 additions & 33 deletions compio-runtime/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,12 @@ impl Runtime {
let res = std::panic::catch_unwind(AssertUnwindSafe(f));
BufResult(Ok(0), res)
});
unsafe { self.spawn_unchecked(self.submit(op).map(|res| res.1.into_inner())) }
// It is safe and sound to use `submit` here because the task is spawned
// immediately.
#[allow(deprecated)]
unsafe {
self.spawn_unchecked(self.submit(op).map(|res| res.1.into_inner()))
}
}

/// Attach a raw file descriptor/handle/socket to the runtime.
Expand All @@ -234,7 +239,13 @@ impl Runtime {
/// Submit an operation to the runtime.
///
/// You only need this when authoring your own [`OpCode`].
///
/// It is safe to send the returned future to another runtime and poll it,
/// but the exact behavior is not guaranteed, e.g. it may return pending
/// forever or else.
#[deprecated = "use compio::runtime::submit instead"]
pub fn submit<T: OpCode + 'static>(&self, op: T) -> impl Future<Output = BufResult<usize, T>> {
#[allow(deprecated)]
self.submit_with_flags(op).map(|(res, _)| res)
}

Expand All @@ -244,6 +255,11 @@ impl Runtime {
/// the flags
///
/// You only need this when authoring your own [`OpCode`].
///
/// It is safe to send the returned future to another runtime and poll it,
/// but the exact behavior is not guaranteed, e.g. it may return pending
/// forever or else.
#[deprecated = "use compio::runtime::submit_with_flags instead"]
pub fn submit_with_flags<T: OpCode + 'static>(
&self,
op: T,
Expand All @@ -258,11 +274,6 @@ impl Runtime {
}
}

#[cfg(feature = "time")]
pub(crate) fn create_timer(&self, instant: std::time::Instant) -> impl Future<Output = ()> {
TimerFuture::new(instant)
}

pub(crate) fn cancel_op<T: OpCode>(&self, op: Key<T>) {
self.driver.borrow_mut().cancel(op);
}
Expand All @@ -285,21 +296,6 @@ impl Runtime {
})
}

#[cfg(feature = "time")]
pub(crate) fn register_timer(
&self,
cx: &mut Context,
instant: std::time::Instant,
) -> Option<usize> {
let mut timer_runtime = self.timer_runtime.borrow_mut();
if let Some(key) = timer_runtime.insert(instant) {
timer_runtime.update_waker(key, cx.waker().clone());
Some(key)
} else {
None
}
}

#[cfg(feature = "time")]
pub(crate) fn poll_timer(&self, cx: &mut Context, key: usize) -> Poll<()> {
instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key);
Expand Down Expand Up @@ -478,29 +474,37 @@ pub fn spawn_blocking<T: Send + 'static>(

/// Submit an operation to the current runtime, and return a future for it.
///
/// It is safe but unspecified behavior to send the returned future to another
/// runtime and poll it.
///
/// ## Panics
///
/// This method doesn't create runtime. It tries to obtain the current runtime
/// by [`Runtime::with_current`].
pub fn submit<T: OpCode + 'static>(op: T) -> impl Future<Output = BufResult<usize, T>> {
Runtime::with_current(|r| r.submit(op))
pub async fn submit<T: OpCode + 'static>(op: T) -> BufResult<usize, T> {
submit_with_flags(op).await.0
}

/// Submit an operation to the current runtime, and return a future for it with
/// flags.
///
/// It is safe but unspecified behavior to send the returned future to another
/// runtime and poll it.
///
/// ## Panics
///
/// This method doesn't create runtime. It tries to obtain the current runtime
/// by [`Runtime::with_current`].
pub fn submit_with_flags<T: OpCode + 'static>(
op: T,
) -> impl Future<Output = (BufResult<usize, T>, u32)> {
Runtime::with_current(|r| r.submit_with_flags(op))
pub async fn submit_with_flags<T: OpCode + 'static>(op: T) -> (BufResult<usize, T>, u32) {
let state = Runtime::with_current(|r| r.submit_raw(op));
match state {
PushEntry::Pending(user_data) => OpFuture::new(user_data).await,
PushEntry::Ready(res) => {
// submit_flags won't be ready immediately, if ready, it must be error without
// flags, or the flags are not necessary
(res, 0)
}
}
}

#[cfg(feature = "time")]
pub(crate) async fn create_timer(instant: std::time::Instant) {
let key = Runtime::with_current(|r| r.timer_runtime.borrow_mut().insert(instant));
if let Some(key) = key {
TimerFuture::new(key).await
}
}
26 changes: 6 additions & 20 deletions compio-runtime/src/runtime/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::{
time::{Duration, Instant},
};

use futures_util::future::Either;
use slab::Slab;

use crate::runtime::Runtime;
Expand Down Expand Up @@ -125,39 +124,26 @@ impl TimerRuntime {
}

pub struct TimerFuture {
key: Either<Instant, usize>,
key: usize,
}

impl TimerFuture {
pub fn new(instant: Instant) -> Self {
Self {
key: Either::Left(instant),
}
pub fn new(key: usize) -> Self {
Self { key }
}
}

impl Future for TimerFuture {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Runtime::with_current(|r| match self.key {
Either::Left(instant) => match r.register_timer(cx, instant) {
Some(key) => {
self.key = Either::Right(key);
Poll::Pending
}
None => Poll::Ready(()),
},
Either::Right(key) => r.poll_timer(cx, key),
})
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Runtime::with_current(|r| r.poll_timer(cx, self.key))
}
}

impl Drop for TimerFuture {
fn drop(&mut self) {
if let Either::Right(key) = self.key {
Runtime::with_current(|r| r.cancel_timer(key));
}
Runtime::with_current(|r| r.cancel_timer(self.key));
}
}

Expand Down
4 changes: 1 addition & 3 deletions compio-runtime/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ use std::{

use futures_util::{FutureExt, select};

use crate::Runtime;

/// Waits until `duration` has elapsed.
///
/// Equivalent to [`sleep_until(Instant::now() + duration)`](sleep_until). An
Expand Down Expand Up @@ -55,7 +53,7 @@ pub async fn sleep(duration: Duration) {
/// # })
/// ```
pub async fn sleep_until(deadline: Instant) {
Runtime::with_current(|r| r.create_timer(deadline)).await
crate::runtime::create_timer(deadline).await
}

/// Error returned by [`timeout`] or [`timeout_at`].
Expand Down

0 comments on commit 7ff681e

Please sign in to comment.