Skip to content

Commit

Permalink
fix(runtime): register timer on first poll
Browse files Browse the repository at this point in the history
  • Loading branch information
Berrysoft committed Nov 28, 2024
1 parent 7c9d555 commit befb6ad
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 15 deletions.
22 changes: 18 additions & 4 deletions compio-runtime/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,10 @@ impl Runtime {

#[cfg(feature = "time")]
pub(crate) fn create_timer(&self, delay: std::time::Duration) -> impl Future<Output = ()> {
let mut timer_runtime = self.timer_runtime.borrow_mut();
if let Some(key) = timer_runtime.insert(delay) {
Either::Left(TimerFuture::new(key))
} else {
if delay.is_zero() {
Either::Right(std::future::ready(()))
} else {
Either::Left(TimerFuture::new(std::time::Instant::now() + delay))
}
}

Expand All @@ -318,6 +317,21 @@ impl Runtime {
})
}

#[cfg(feature = "time")]
pub(crate) fn register_timer(
&self,
cx: &mut Context,
instant: std::time::Instant,
) -> Option<usize> {
let mut timer_runtime = self.timer_runtime.borrow_mut();
if let Some(key) = timer_runtime.insert(instant) {
timer_runtime.update_waker(key, cx.waker().clone());
Some(key)
} else {
None
}
}

#[cfg(feature = "time")]
pub(crate) fn poll_timer(&self, cx: &mut Context, key: usize) -> Poll<()> {
instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key);
Expand Down
36 changes: 25 additions & 11 deletions compio-runtime/src/runtime/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
time::{Duration, Instant},
};

use futures_util::future::Either;
use slab::Slab;

use crate::runtime::Runtime;
Expand Down Expand Up @@ -70,13 +71,12 @@ impl TimerRuntime {
.unwrap_or_default()
}

pub fn insert(&mut self, mut delay: Duration) -> Option<usize> {
pub fn insert(&mut self, instant: Instant) -> Option<usize> {
let delay = instant - self.time;
if delay.is_zero() {
return None;
}
let elapsed = self.time.elapsed();
let key = self.tasks.insert(FutureState::Active(None));
delay += elapsed;
let entry = TimerEntry { key, delay };
self.wheel.push(Reverse(entry));
Some(key)
Expand Down Expand Up @@ -125,26 +125,39 @@ impl TimerRuntime {
}

pub struct TimerFuture {
key: usize,
key: Either<Instant, usize>,
}

impl TimerFuture {
pub fn new(key: usize) -> Self {
Self { key }
pub fn new(instant: Instant) -> Self {
Self {
key: Either::Left(instant),
}
}
}

impl Future for TimerFuture {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Runtime::with_current(|r| r.poll_timer(cx, self.key))
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Runtime::with_current(|r| match self.key {
Either::Left(instant) => match r.register_timer(cx, instant) {
Some(key) => {
self.key = Either::Right(key);
Poll::Pending
}
None => Poll::Ready(()),
},
Either::Right(key) => r.poll_timer(cx, key),
})
}
}

impl Drop for TimerFuture {
fn drop(&mut self) {
Runtime::with_current(|r| r.cancel_timer(self.key));
if let Either::Right(key) = self.key {
Runtime::with_current(|r| r.cancel_timer(key));
}
}
}

Expand All @@ -153,8 +166,9 @@ fn timer_min_timeout() {
let mut runtime = TimerRuntime::new();
assert_eq!(runtime.min_timeout(), None);

runtime.insert(Duration::from_secs(1));
runtime.insert(Duration::from_secs(10));
let now = Instant::now();
runtime.insert(now + Duration::from_secs(1));
runtime.insert(now + Duration::from_secs(10));
let min_timeout = runtime.min_timeout().unwrap().as_secs_f32();

assert!(min_timeout < 1.);
Expand Down

0 comments on commit befb6ad

Please sign in to comment.