diff --git a/crossbeam-channel/src/flavors/array.rs b/crossbeam-channel/src/flavors/array.rs index 206a05a86..c5db2b304 100644 --- a/crossbeam-channel/src/flavors/array.rs +++ b/crossbeam-channel/src/flavors/array.rs @@ -87,6 +87,18 @@ pub(crate) struct Channel { receivers: SyncWaker, } +/// The state of the channel after calling `start_recv` or `start_send`. +#[derive(PartialEq, Eq)] +enum Status { + /// The channel is ready to read or write to. + Ready, + /// There is currently a send or receive in progress holding up the queue. + /// All operations must block to preserve linearizability. + InProgress, + /// The channel is empty. + Empty, +} + impl Channel { /// Creates a bounded channel of capacity `cap`. pub(crate) fn with_capacity(cap: usize) -> Self { @@ -135,7 +147,7 @@ impl Channel { } /// Attempts to reserve a slot for sending a message. - fn start_send(&self, token: &mut Token) -> bool { + fn start_send(&self, token: &mut Token) -> Status { let backoff = Backoff::new(); let mut tail = self.tail.load(Ordering::Relaxed); @@ -144,7 +156,7 @@ impl Channel { if tail & self.mark_bit != 0 { token.array.slot = ptr::null(); token.array.stamp = 0; - return true; + return Status::Ready; } // Deconstruct the tail. @@ -179,7 +191,7 @@ impl Channel { // Prepare the token for the follow-up call to `write`. token.array.slot = slot as *const Slot as *const u8; token.array.stamp = tail + 1; - return true; + return Status::Ready; } Err(t) => { tail = t; @@ -193,7 +205,14 @@ impl Channel { // If the head lags one lap behind the tail as well... if head.wrapping_add(self.one_lap) == tail { // ...then the channel is full. - return false; + return Status::Empty; + } + + // The head was advanced but the stamp hasn't been updated yet, + // meaning a receive is in-progress. Spin for a bit waiting for + // the receive to complete before falling back to parking. + if backoff.is_completed() { + return Status::InProgress; } backoff.spin(); @@ -225,7 +244,7 @@ impl Channel { } /// Attempts to reserve a slot for receiving a message. - fn start_recv(&self, token: &mut Token) -> bool { + fn start_recv(&self, token: &mut Token) -> Status { let backoff = Backoff::new(); let mut head = self.head.load(Ordering::Relaxed); @@ -262,7 +281,7 @@ impl Channel { // Prepare the token for the follow-up call to `read`. token.array.slot = slot as *const Slot as *const u8; token.array.stamp = head.wrapping_add(self.one_lap); - return true; + return Status::Ready; } Err(h) => { head = h; @@ -280,13 +299,20 @@ impl Channel { // ...then receive an error. token.array.slot = ptr::null(); token.array.stamp = 0; - return true; + return Status::Ready; } else { // Otherwise, the receive operation is not ready. - return false; + return Status::Empty; } } + // The tail was advanced but the stamp hasn't been updated yet, + // meaning a send is in-progress. Spin for a bit waiting for + // the send to complete before falling back to parking. + if backoff.is_completed() { + return Status::InProgress; + } + backoff.spin(); head = self.head.load(Ordering::Relaxed); } else { @@ -317,11 +343,13 @@ impl Channel { /// Attempts to send a message into the channel. pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError> { - let token = &mut Token::default(); - if self.start_send(token) { - unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) } - } else { - Err(TrySendError::Full(msg)) + match self.send_blocking(msg, None, false) { + Ok(None) => Ok(()), + Ok(Some(msg)) => Err(TrySendError::Full(msg)), + Err(SendTimeoutError::Disconnected(msg)) => Err(TrySendError::Disconnected(msg)), + Err(SendTimeoutError::Timeout(_)) => { + unreachable!("called recv_blocking with deadline: None") + } } } @@ -331,14 +359,30 @@ impl Channel { msg: T, deadline: Option, ) -> Result<(), SendTimeoutError> { + self.send_blocking(msg, deadline, true) + .map(|value| assert!(value.is_none(), "called send_blocking with block: true")) + } + + /// Sends a message into the channel. + pub(crate) fn send_blocking( + &self, + msg: T, + deadline: Option, + block: bool, + ) -> Result, SendTimeoutError> { let token = &mut Token::default(); + let mut state = self.senders.start(); loop { // Try sending a message several times. let backoff = Backoff::new(); loop { - if self.start_send(token) { - let res = unsafe { self.write(token, msg) }; - return res.map_err(SendTimeoutError::Disconnected); + match self.start_send(token) { + Status::Ready => { + let res = unsafe { self.write(token, msg) }; + return res.map(|_| None).map_err(SendTimeoutError::Disconnected); + } + Status::Empty if !block => return Ok(Some(msg)), + _ => {} } if backoff.is_completed() { @@ -357,7 +401,7 @@ impl Channel { Context::with(|cx| { // Prepare for blocking until a receiver wakes us up. let oper = Operation::hook(token); - self.senders.register(oper, cx); + self.senders.register2(oper, cx, &state); // Has the channel become ready just now? if !self.is_full() || self.is_disconnected() { @@ -375,30 +419,47 @@ impl Channel { Selected::Operation(_) => {} } }); + + state.unpark(); } } /// Attempts to receive a message without blocking. pub(crate) fn try_recv(&self) -> Result { - let token = &mut Token::default(); - - if self.start_recv(token) { - unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } - } else { - Err(TryRecvError::Empty) + match self.recv_blocking(None, false) { + Ok(Some(value)) => Ok(value), + Ok(None) => Err(TryRecvError::Empty), + Err(RecvTimeoutError::Disconnected) => Err(TryRecvError::Disconnected), + Err(RecvTimeoutError::Timeout) => { + unreachable!("called recv_blocking with deadline: None") + } } } /// Receives a message from the channel. pub(crate) fn recv(&self, deadline: Option) -> Result { + self.recv_blocking(deadline, true) + .map(|value| value.expect("called recv_blocking with block: true")) + } + + pub(crate) fn recv_blocking( + &self, + deadline: Option, + block: bool, + ) -> Result, RecvTimeoutError> { let token = &mut Token::default(); + let mut state = self.receivers.start(); loop { // Try receiving a message several times. let backoff = Backoff::new(); loop { - if self.start_recv(token) { - let res = unsafe { self.read(token) }; - return res.map_err(|_| RecvTimeoutError::Disconnected); + match self.start_recv(token) { + Status::Ready => { + let res = unsafe { self.read(token) }; + return res.map(Some).map_err(|_| RecvTimeoutError::Disconnected); + } + Status::Empty if !block => return Ok(None), + _ => {} } if backoff.is_completed() { @@ -417,7 +478,7 @@ impl Channel { Context::with(|cx| { // Prepare for blocking until a sender wakes us up. let oper = Operation::hook(token); - self.receivers.register(oper, cx); + self.receivers.register2(oper, cx, &mut state); // Has the channel become ready just now? if !self.is_empty() || self.is_disconnected() { @@ -437,6 +498,8 @@ impl Channel { Selected::Operation(_) => {} } }); + + state.unpark(); } } @@ -568,7 +631,7 @@ pub(crate) struct Sender<'a, T>(&'a Channel); impl SelectHandle for Receiver<'_, T> { fn try_select(&self, token: &mut Token) -> bool { - self.0.start_recv(token) + self.0.start_recv(token) == Status::Ready } fn deadline(&self) -> Option { @@ -604,7 +667,7 @@ impl SelectHandle for Receiver<'_, T> { impl SelectHandle for Sender<'_, T> { fn try_select(&self, token: &mut Token) -> bool { - self.0.start_send(token) + self.0.start_send(token) == Status::Ready } fn deadline(&self) -> Option { diff --git a/crossbeam-channel/src/flavors/list.rs b/crossbeam-channel/src/flavors/list.rs index e86551ad2..bc5e9695b 100644 --- a/crossbeam-channel/src/flavors/list.rs +++ b/crossbeam-channel/src/flavors/list.rs @@ -168,6 +168,18 @@ pub(crate) struct Channel { _marker: PhantomData, } +/// The status of the channel after calling `start_recv`. +#[derive(PartialEq, Eq)] +enum Status { + /// The channel has a message ready to read. + Ready, + /// There is currently a send in progress holding up the queue. + /// Both `recv` and `try_recv` must block to preserve linearizability. + InProgress, + /// The channel is empty. + Empty, +} + impl Channel { /// Creates a new unbounded channel. pub(crate) fn new() -> Self { @@ -298,7 +310,7 @@ impl Channel { } /// Attempts to reserve a slot for receiving a message. - fn start_recv(&self, token: &mut Token) -> bool { + fn start_recv(&self, token: &mut Token) -> Status { let backoff = Backoff::new(); let mut head = self.head.index.load(Ordering::Acquire); let mut block = self.head.block.load(Ordering::Acquire); @@ -307,8 +319,14 @@ impl Channel { // Calculate the offset of the index into the block. let offset = (head >> SHIFT) % LAP; - // If we reached the end of the block, wait until the next one is installed. + // We reached the end of the block but the block is not installed yet, meaning + // the last send on previous block is still in progress. The send is likely to + // be soon so we spin here before falling back to blocking. if offset == BLOCK_CAP { + if backoff.is_completed() { + return Status::InProgress; + } + backoff.snooze(); head = self.head.index.load(Ordering::Acquire); block = self.head.block.load(Ordering::Acquire); @@ -327,10 +345,10 @@ impl Channel { if tail & MARK_BIT != 0 { // ...then receive an error. token.list.block = ptr::null(); - return true; + return Status::Ready; } else { // Otherwise, the receive operation is not ready. - return false; + return Status::Empty; } } @@ -340,9 +358,14 @@ impl Channel { } } - // The block can be null here only if the first message is being sent into the channel. - // In that case, just wait until it gets initialized. + // The block can be null here only if the first message sent into the channel is + // in progress. The send is likely to complete soon so we spin here before falling + // back to blocking. if block.is_null() { + if backoff.is_completed() { + return Status::InProgress; + } + backoff.snooze(); head = self.head.index.load(Ordering::Acquire); block = self.head.block.load(Ordering::Acquire); @@ -371,7 +394,7 @@ impl Channel { token.list.block = block as *const u8; token.list.offset = offset; - return true; + return Status::Ready; }, Err(h) => { head = h; @@ -433,26 +456,42 @@ impl Channel { /// Attempts to receive a message without blocking. pub(crate) fn try_recv(&self) -> Result { - let token = &mut Token::default(); - - if self.start_recv(token) { - unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } - } else { - Err(TryRecvError::Empty) + match self.recv_blocking(None, false) { + Ok(Some(value)) => Ok(value), + Ok(None) => Err(TryRecvError::Empty), + Err(RecvTimeoutError::Disconnected) => Err(TryRecvError::Disconnected), + Err(RecvTimeoutError::Timeout) => { + unreachable!("called recv_blocking with deadline: None") + } } } /// Receives a message from the channel. pub(crate) fn recv(&self, deadline: Option) -> Result { + self.recv_blocking(deadline, true) + .map(|value| value.expect("called recv_blocking with block: true")) + } + + /// Receives a message from the channel. + pub(crate) fn recv_blocking( + &self, + deadline: Option, + block: bool, + ) -> Result, RecvTimeoutError> { let token = &mut Token::default(); + + let mut state = self.receivers.start(); loop { // Try receiving a message several times. let backoff = Backoff::new(); loop { - if self.start_recv(token) { - unsafe { - return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); + match self.start_recv(token) { + Status::Ready => { + let res = unsafe { self.read(token) }; + return res.map(Some).map_err(|_| RecvTimeoutError::Disconnected); } + Status::Empty if !block => return Ok(None), + _ => {} } if backoff.is_completed() { @@ -471,7 +510,7 @@ impl Channel { // Prepare for blocking until a sender wakes us up. Context::with(|cx| { let oper = Operation::hook(token); - self.receivers.register(oper, cx); + self.receivers.register2(oper, cx, &state); // Has the channel become ready just now? if !self.is_empty() || self.is_disconnected() { @@ -490,6 +529,8 @@ impl Channel { } Selected::Operation(_) => {} } + + state.unpark(); }); } } @@ -695,7 +736,7 @@ pub(crate) struct Sender<'a, T>(&'a Channel); impl SelectHandle for Receiver<'_, T> { fn try_select(&self, token: &mut Token) -> bool { - self.0.start_recv(token) + self.0.start_recv(token) == Status::Ready } fn deadline(&self) -> Option { diff --git a/crossbeam-channel/src/waker.rs b/crossbeam-channel/src/waker.rs index 7a88c8fdc..99e0960ee 100644 --- a/crossbeam-channel/src/waker.rs +++ b/crossbeam-channel/src/waker.rs @@ -1,7 +1,8 @@ //! Waking mechanism for threads blocked on channel operations. +use core::sync::atomic::AtomicU32; use std::ptr; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::Ordering; use std::sync::Mutex; use std::thread::{self, ThreadId}; use std::vec::Vec; @@ -179,8 +180,8 @@ pub(crate) struct SyncWaker { /// The inner `Waker`. inner: Mutex, - /// `true` if the waker is empty. - is_empty: AtomicBool, + /// Atomic state for this waker. + state: WakerState, } impl SyncWaker { @@ -189,58 +190,52 @@ impl SyncWaker { pub(crate) fn new() -> Self { Self { inner: Mutex::new(Waker::new()), - is_empty: AtomicBool::new(true), + state: WakerState::new(), + } + } + + /// Returns a token that can be used to manage the state of a blocking operation. + pub(crate) fn start(&self) -> BlockingState<'_> { + BlockingState { + is_waker: false, + waker: self, } } /// Registers the current thread with an operation. #[inline] - pub(crate) fn register(&self, oper: Operation, cx: &Context) { - let mut inner = self.inner.lock().unwrap(); - inner.register(oper, cx); - self.is_empty.store( - inner.selectors.is_empty() && inner.observers.is_empty(), - Ordering::SeqCst, - ); + pub(crate) fn register(&self, oper: Operation, cx: &Context, state: &BlockingState<'_>) { + self.inner.lock().unwrap().register(oper, cx); + self.state.park(state.is_waker); } /// Unregisters an operation previously registered by the current thread. #[inline] pub(crate) fn unregister(&self, oper: Operation) -> Option { - let mut inner = self.inner.lock().unwrap(); - let entry = inner.unregister(oper); - self.is_empty.store( - inner.selectors.is_empty() && inner.observers.is_empty(), - Ordering::SeqCst, - ); - entry + self.inner.lock().unwrap().unregister(oper) } /// Attempts to find one thread (not the current one), select its operation, and wake it up. #[inline] pub(crate) fn notify(&self) { - if !self.is_empty.load(Ordering::SeqCst) { - let mut inner = self.inner.lock().unwrap(); - if !self.is_empty.load(Ordering::SeqCst) { - inner.try_select(); - inner.notify(); - self.is_empty.store( - inner.selectors.is_empty() && inner.observers.is_empty(), - Ordering::SeqCst, - ); - } + if self.state.try_notify() { + self.notify_one() } } + // Finds a thread (not the current one), select its operation, and wake it up. + #[inline] + pub(crate) fn notify_one(&self) { + let mut inner = self.inner.lock().unwrap(); + inner.try_select(); + inner.notify(); + } + /// Registers an operation waiting to be ready. #[inline] pub(crate) fn watch(&self, oper: Operation, cx: &Context) { - let mut inner = self.inner.lock().unwrap(); - inner.watch(oper, cx); - self.is_empty.store( - inner.selectors.is_empty() && inner.observers.is_empty(), - Ordering::SeqCst, - ); + self.inner.lock().unwrap().watch(oper, cx); + self.state.park(false); } /// Unregisters an operation waiting to be ready. @@ -248,10 +243,6 @@ impl SyncWaker { pub(crate) fn unwatch(&self, oper: Operation) { let mut inner = self.inner.lock().unwrap(); inner.unwatch(oper); - self.is_empty.store( - inner.selectors.is_empty() && inner.observers.is_empty(), - Ordering::SeqCst, - ); } /// Notifies all threads that the channel is disconnected. @@ -259,17 +250,144 @@ impl SyncWaker { pub(crate) fn disconnect(&self) { let mut inner = self.inner.lock().unwrap(); inner.disconnect(); - self.is_empty.store( - inner.selectors.is_empty() && inner.observers.is_empty(), - Ordering::SeqCst, - ); } } impl Drop for SyncWaker { #[inline] fn drop(&mut self) { - debug_assert!(self.is_empty.load(Ordering::SeqCst)); + debug_assert!(!self.state.has_waiters()); + } +} + +/// A guard that manages the state of a blocking operation. +#[derive(Clone)] +struct BlockingState<'a> { + /// True if this thread is the waker thread, meaning it must + /// try to notify waiters after it completes. + is_waker: bool, + + waker: &'a SyncWaker, +} + +impl BlockingState<'_> { + /// Reset the state after waking up from parking. + #[inline] + pub(crate) fn unpark(&mut self) { + self.is_waker = self.waker.state.unpark(); + } + + /// Reset the state of an observer after waking up from parking. + #[inline] + pub(crate) fn unpark_observer(&mut self) { + self.waker.state.unpark_observer(); + } +} + +impl Drop for BlockingState<'_> { + fn drop(&mut self) { + if self.is_waker && self.waker.state.drop_waker() { + self.waker.notify_one(); + } + } +} + +const NOTIFIED: u32 = 0b001; +const WAKER: u32 = 0b010; + +/// The state of a `SyncWaker`. +struct WakerState { + state: AtomicU32, +} + +impl WakerState { + /// Initialize the waker state. + fn new() -> WakerState { + WakerState { + state: AtomicU32::new(0), + } + } + + /// Returns whether or not a waiter needs to be notified. + fn try_notify(&self) -> bool { + // because storing a value in the channel is also sequentially consistent, + // this creates a total order between storing a value and registering a waiter. + let state = self.state.load(Ordering::SeqCst); + + // if a notification is already set, the waker thread will take care + // of further notifications. otherwise we have to notify if there are waiters + if ((state >> NOTIFIED) & (state & NOTIFIED)) > 0 { + return self + .state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { + // set the notification if there are waiters and it is not already set + if (state >> WAKER) > 0 && (state & NOTIFIED == 0) { + Some(state | (WAKER | NOTIFIED)) + } else { + None + } + }) + .is_ok(); + } + + false + } + + /// Get ready for this waker to park. The channel should be checked after calling this + /// method, and before parking. + fn park(&self, waker: bool) { + // increment the waiter count. if we are the waker thread, we also have to remove the + // notification to allow other waiters to be notified after we park + let update = (1_u32 << WAKER).wrapping_sub(u32::from(waker)); + self.state.fetch_add(update, Ordering::SeqCst); + } + + /// Remove an observer from the waker state after it was unparked. + /// + /// Observers never become the waking thread because if there were waiters, they + /// are also woken up along with any observers. + fn unpark_observer(&self) { + self.state.fetch_sub(1_u32 << WAKER, Ordering::SeqCst); + } + + /// Remove this waiter from the waker state after it was unparked. + /// + /// Returns `true` if this thread became the waking thread and must call `drop_waker` + /// after it completes it's operation. + fn unpark(&self) -> bool { + self.state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { + // decrement the waiter count and consume the waker token + Some((state - (1 << WAKER)) & !WAKER) + }) + // did we consume the token and become the waker thread? + .map(|state| state & WAKER != 0) + .unwrap() + } + + /// Called by the waking thread after completing it's operation. + /// + /// Returns `true` if a waiter should be notified. + fn drop_waker(&self) -> bool { + self.state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { + // if there are waiters, set the waker token and wake someone, transferring the + // waker thread. otherwise unset the notification so new waiters can synchronize + // with new notifications + Some(if (state >> WAKER) > 0 { + state | WAKER + } else { + state.wrapping_sub(NOTIFIED) + }) + }) + // were there waiters? + .map(|state| (state >> WAKER) > 0) + .unwrap() + } + + /// Returns `true` if there are active waiters. + fn has_waiters(&self) -> bool { + (self.state.load(Ordering::Relaxed) >> WAKER) > 0 } } diff --git a/crossbeam-channel/tests/array.rs b/crossbeam-channel/tests/array.rs index 486f56a78..74bd30258 100644 --- a/crossbeam-channel/tests/array.rs +++ b/crossbeam-channel/tests/array.rs @@ -427,6 +427,7 @@ fn stress_oneshot() { } } +// TODO: failing sometimes #[test] fn stress_iter() { #[cfg(miri)] diff --git a/crossbeam-utils/src/backoff.rs b/crossbeam-utils/src/backoff.rs index 9729ce695..9cfce48f7 100644 --- a/crossbeam-utils/src/backoff.rs +++ b/crossbeam-utils/src/backoff.rs @@ -269,7 +269,16 @@ impl Backoff { /// [`AtomicBool`]: std::sync::atomic::AtomicBool #[inline] pub fn is_completed(&self) -> bool { - self.step.get() > YIELD_LIMIT + #[cfg(not(test))] + { + self.step.get() > YIELD_LIMIT + } + + // avoid spinning during tests, which can hide bugs in the waker + #[cfg(test)] + { + true + } } }