Skip to content

Commit

Permalink
Merge commit '680b86b17c63b67f768bc5da5f34e5ccf056a0ce' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
maxbrunsfeld committed Sep 10, 2021
2 parents 6afd477 + 680b86b commit 3d4a451
Show file tree
Hide file tree
Showing 14 changed files with 1,041 additions and 435 deletions.
96 changes: 76 additions & 20 deletions gpui/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use async_task::Runnable;
pub use async_task::Task;
use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
use parking_lot::Mutex;
use postage::{barrier, prelude::Stream as _};
use rand::prelude::*;
use smol::{channel, prelude::*, Executor};
use smol::{channel, prelude::*, Executor, Timer};
use std::{
fmt::{self, Debug},
marker::PhantomData,
Expand All @@ -18,7 +19,7 @@ use std::{
},
task::{Context, Poll},
thread,
time::Duration,
time::{Duration, Instant},
};
use waker_fn::waker_fn;

Expand Down Expand Up @@ -49,6 +50,8 @@ struct DeterministicState {
spawned_from_foreground: Vec<(Runnable, Backtrace)>,
forbid_parking: bool,
block_on_ticks: RangeInclusive<usize>,
now: Instant,
pending_timers: Vec<(Instant, barrier::Sender)>,
}

pub struct Deterministic {
Expand All @@ -67,6 +70,8 @@ impl Deterministic {
spawned_from_foreground: Default::default(),
forbid_parking: false,
block_on_ticks: 0..=1000,
now: Instant::now(),
pending_timers: Default::default(),
})),
parker: Default::default(),
}
Expand Down Expand Up @@ -119,17 +124,39 @@ impl Deterministic {
T: 'static,
F: Future<Output = T> + 'static,
{
let woken = Arc::new(AtomicBool::new(false));
let mut future = Box::pin(future);
loop {
if let Some(result) = self.run_internal(woken.clone(), &mut future) {
return result;
}

if !woken.load(SeqCst) && self.state.lock().forbid_parking {
panic!("deterministic executor parked after a call to forbid_parking");
}

woken.store(false, SeqCst);
self.parker.lock().park();
}
}

fn run_until_parked(&self) {
let woken = Arc::new(AtomicBool::new(false));
let future = std::future::pending::<()>();
smol::pin!(future);
self.run_internal(woken, future);
}

pub fn run_internal<F, T>(&self, woken: Arc<AtomicBool>, mut future: F) -> Option<T>
where
T: 'static,
F: Future<Output = T> + Unpin,
{
let unparker = self.parker.lock().unparker();
let woken = Arc::new(AtomicBool::new(false));
let waker = {
let woken = woken.clone();
waker_fn(move || {
woken.store(true, SeqCst);
unparker.unpark();
})
};
let waker = waker_fn(move || {
woken.store(true, SeqCst);
unparker.unpark();
});

let mut cx = Context::from_waker(&waker);
let mut trace = Trace::default();
Expand Down Expand Up @@ -163,23 +190,17 @@ impl Deterministic {
runnable.run();
} else {
drop(state);
if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
return result;
if let Poll::Ready(result) = future.poll(&mut cx) {
return Some(result);
}

let state = self.state.lock();
if state.scheduled_from_foreground.is_empty()
&& state.scheduled_from_background.is_empty()
&& state.spawned_from_foreground.is_empty()
{
if state.forbid_parking && !woken.load(SeqCst) {
panic!("deterministic executor parked after a call to forbid_parking");
}
drop(state);
woken.store(false, SeqCst);
self.parker.lock().park();
return None;
}

continue;
}
}
}
Expand Down Expand Up @@ -407,6 +428,41 @@ impl Foreground {
}
}

pub async fn timer(&self, duration: Duration) {
match self {
Self::Deterministic(executor) => {
let (tx, mut rx) = barrier::channel();
{
let mut state = executor.state.lock();
let wakeup_at = state.now + duration;
state.pending_timers.push((wakeup_at, tx));
}
rx.recv().await;
}
_ => {
Timer::after(duration).await;
}
}
}

pub fn advance_clock(&self, duration: Duration) {
match self {
Self::Deterministic(executor) => {
executor.run_until_parked();

let mut state = executor.state.lock();
state.now += duration;
let now = state.now;
let mut pending_timers = mem::take(&mut state.pending_timers);
drop(state);

pending_timers.retain(|(wakeup, _)| *wakeup > now);
executor.state.lock().pending_timers.extend(pending_timers);
}
_ => panic!("this method can only be called on a deterministic executor"),
}
}

pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
match self {
Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
Expand Down
Loading

0 comments on commit 3d4a451

Please sign in to comment.