diff --git a/crossbeam-channel/src/channel.rs b/crossbeam-channel/src/channel.rs index 5447e3303..5aafa6c28 100644 --- a/crossbeam-channel/src/channel.rs +++ b/crossbeam-channel/src/channel.rs @@ -14,6 +14,7 @@ use crate::err::{ }; use crate::flavors; use crate::select::{Operation, SelectHandle, Token}; +use crate::waker::BlockingState; /// Creates a multi-producer multi-consumer channel of unbounded capacity. /// @@ -1358,6 +1359,14 @@ impl fmt::Debug for IntoIter { } impl SelectHandle for Sender { + fn start(&self) -> Option> { + match &self.flavor { + SenderFlavor::Array(chan) => chan.sender().start_ref(), + SenderFlavor::List(chan) => chan.sender().start_ref(), + SenderFlavor::Zero(chan) => chan.start(), + } + } + fn try_select(&self, token: &mut Token) -> bool { match &self.flavor { SenderFlavor::Array(chan) => chan.sender().try_select(token), @@ -1370,11 +1379,11 @@ impl SelectHandle for Sender { None } - fn register(&self, oper: Operation, cx: &Context) -> bool { + fn register(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool { match &self.flavor { - SenderFlavor::Array(chan) => chan.sender().register(oper, cx), - SenderFlavor::List(chan) => chan.sender().register(oper, cx), - SenderFlavor::Zero(chan) => chan.sender().register(oper, cx), + SenderFlavor::Array(chan) => chan.sender().register(oper, cx, state), + SenderFlavor::List(chan) => chan.sender().register(oper, cx, state), + SenderFlavor::Zero(chan) => chan.sender().register(oper, cx, state), } } @@ -1402,11 +1411,11 @@ impl SelectHandle for Sender { } } - fn watch(&self, oper: Operation, cx: &Context) -> bool { + fn watch(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool { match &self.flavor { - SenderFlavor::Array(chan) => chan.sender().watch(oper, cx), - SenderFlavor::List(chan) => chan.sender().watch(oper, cx), - SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx), + SenderFlavor::Array(chan) => chan.sender().watch(oper, cx, state), + SenderFlavor::List(chan) => chan.sender().watch(oper, cx, state), + SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx, state), } } @@ -1420,6 +1429,17 @@ impl SelectHandle for Sender { } impl SelectHandle for Receiver { + fn start(&self) -> Option> { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.receiver().start_ref(), + ReceiverFlavor::List(chan) => chan.receiver().start_ref(), + ReceiverFlavor::Zero(chan) => chan.start(), + ReceiverFlavor::At(chan) => chan.start(), + ReceiverFlavor::Tick(chan) => chan.start(), + ReceiverFlavor::Never(chan) => chan.start(), + } + } + fn try_select(&self, token: &mut Token) -> bool { match &self.flavor { ReceiverFlavor::Array(chan) => chan.receiver().try_select(token), @@ -1442,14 +1462,14 @@ impl SelectHandle for Receiver { } } - fn register(&self, oper: Operation, cx: &Context) -> bool { + fn register(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool { match &self.flavor { - ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx), - ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx), - ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx), - ReceiverFlavor::At(chan) => chan.register(oper, cx), - ReceiverFlavor::Tick(chan) => chan.register(oper, cx), - ReceiverFlavor::Never(chan) => chan.register(oper, cx), + ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx, state), + ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx, state), + ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx, state), + ReceiverFlavor::At(chan) => chan.register(oper, cx, state), + ReceiverFlavor::Tick(chan) => chan.register(oper, cx, state), + ReceiverFlavor::Never(chan) => chan.register(oper, cx, state), } } @@ -1486,14 +1506,14 @@ impl SelectHandle for Receiver { } } - fn watch(&self, oper: Operation, cx: &Context) -> bool { + fn watch(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool { match &self.flavor { - ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx), - ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx), - ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx), - ReceiverFlavor::At(chan) => chan.watch(oper, cx), - ReceiverFlavor::Tick(chan) => chan.watch(oper, cx), - ReceiverFlavor::Never(chan) => chan.watch(oper, cx), + ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx, state), + ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx, state), + ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx, state), + ReceiverFlavor::At(chan) => chan.watch(oper, cx, state), + ReceiverFlavor::Tick(chan) => chan.watch(oper, cx, state), + ReceiverFlavor::Never(chan) => chan.watch(oper, cx, state), } } diff --git a/crossbeam-channel/src/flavors/array.rs b/crossbeam-channel/src/flavors/array.rs index c5db2b304..2f9c2f3e9 100644 --- a/crossbeam-channel/src/flavors/array.rs +++ b/crossbeam-channel/src/flavors/array.rs @@ -20,7 +20,7 @@ use crossbeam_utils::{Backoff, CachePadded}; use crate::context::Context; use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; use crate::select::{Operation, SelectHandle, Selected, Token}; -use crate::waker::SyncWaker; +use crate::waker::{BlockingState, SyncWaker}; /// A slot in a channel. struct Slot { @@ -401,7 +401,7 @@ impl Channel { Context::with(|cx| { // Prepare for blocking until a receiver wakes us up. let oper = Operation::hook(token); - self.senders.register2(oper, cx, &state); + self.senders.register(oper, cx, &state); // Has the channel become ready just now? if !self.is_full() || self.is_disconnected() { @@ -478,7 +478,7 @@ impl Channel { Context::with(|cx| { // Prepare for blocking until a sender wakes us up. let oper = Operation::hook(token); - self.receivers.register2(oper, cx, &mut state); + self.receivers.register(oper, cx, &state); // Has the channel become ready just now? if !self.is_empty() || self.is_disconnected() { @@ -629,7 +629,18 @@ pub(crate) struct Receiver<'a, T>(&'a Channel); /// Sender handle to a channel. pub(crate) struct Sender<'a, T>(&'a Channel); +impl<'a, T> Receiver<'a, T> { + /// Same as `SelectHandle::start`, but with a more specific lifetime. + pub(crate) fn start_ref(&self) -> Option> { + Some(self.0.receivers.start()) + } +} + impl SelectHandle for Receiver<'_, T> { + fn start(&self) -> Option> { + self.start_ref() + } + fn try_select(&self, token: &mut Token) -> bool { self.0.start_recv(token) == Status::Ready } @@ -638,8 +649,9 @@ impl SelectHandle for Receiver<'_, T> { None } - fn register(&self, oper: Operation, cx: &Context) -> bool { - self.0.receivers.register(oper, cx); + fn register(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool { + let state = state.expect("Receiver::start returns blocking state"); + self.0.receivers.register(oper, cx, state); self.is_ready() } @@ -655,8 +667,9 @@ impl SelectHandle for Receiver<'_, T> { !self.0.is_empty() || self.0.is_disconnected() } - fn watch(&self, oper: Operation, cx: &Context) -> bool { - self.0.receivers.watch(oper, cx); + fn watch(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool { + let state = state.expect("Receiver::start returns blocking state"); + self.0.receivers.watch(oper, cx, state); self.is_ready() } @@ -665,7 +678,18 @@ impl SelectHandle for Receiver<'_, T> { } } +impl<'a, T> Sender<'a, T> { + /// Same as `SelectHandle::start`, but with a more specific lifetime. + pub(crate) fn start_ref(&self) -> Option> { + Some(self.0.senders.start()) + } +} + impl SelectHandle for Sender<'_, T> { + fn start(&self) -> Option> { + self.start_ref() + } + fn try_select(&self, token: &mut Token) -> bool { self.0.start_send(token) == Status::Ready } @@ -674,8 +698,9 @@ impl SelectHandle for Sender<'_, T> { None } - fn register(&self, oper: Operation, cx: &Context) -> bool { - self.0.senders.register(oper, cx); + fn register(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool { + let state = state.expect("Sender::start returns blocking state"); + self.0.senders.register(oper, cx, state); self.is_ready() } @@ -691,8 +716,9 @@ impl SelectHandle for Sender<'_, T> { !self.0.is_full() || self.0.is_disconnected() } - fn watch(&self, oper: Operation, cx: &Context) -> bool { - self.0.senders.watch(oper, cx); + fn watch(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool { + let state = state.expect("Sender::start returns blocking state"); + self.0.senders.watch(oper, cx, state); self.is_ready() } diff --git a/crossbeam-channel/src/flavors/at.rs b/crossbeam-channel/src/flavors/at.rs index 83e69c1ed..177d5d1a7 100644 --- a/crossbeam-channel/src/flavors/at.rs +++ b/crossbeam-channel/src/flavors/at.rs @@ -10,6 +10,7 @@ use crate::context::Context; use crate::err::{RecvTimeoutError, TryRecvError}; use crate::select::{Operation, SelectHandle, Token}; use crate::utils; +use crate::waker::BlockingState; /// Result of a receive operation. pub(crate) type AtToken = Option; @@ -140,6 +141,10 @@ impl Channel { } impl SelectHandle for Channel { + fn start(&self) -> Option> { + None + } + #[inline] fn try_select(&self, token: &mut Token) -> bool { match self.try_recv() { @@ -166,7 +171,12 @@ impl SelectHandle for Channel { } #[inline] - fn register(&self, _oper: Operation, _cx: &Context) -> bool { + fn register( + &self, + _oper: Operation, + _cx: &Context, + _state: Option<&BlockingState<'_>>, + ) -> bool { self.is_ready() } @@ -184,7 +194,7 @@ impl SelectHandle for Channel { } #[inline] - fn watch(&self, _oper: Operation, _cx: &Context) -> bool { + fn watch(&self, _oper: Operation, _cx: &Context, _state: Option<&BlockingState<'_>>) -> bool { self.is_ready() } diff --git a/crossbeam-channel/src/flavors/list.rs b/crossbeam-channel/src/flavors/list.rs index bc5e9695b..04aeb866a 100644 --- a/crossbeam-channel/src/flavors/list.rs +++ b/crossbeam-channel/src/flavors/list.rs @@ -13,7 +13,7 @@ use crossbeam_utils::{Backoff, CachePadded}; use crate::context::Context; use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; use crate::select::{Operation, SelectHandle, Selected, Token}; -use crate::waker::SyncWaker; +use crate::waker::{BlockingState, SyncWaker}; // TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the // following changes by @kleimkuhler: @@ -510,7 +510,7 @@ impl Channel { // Prepare for blocking until a sender wakes us up. Context::with(|cx| { let oper = Operation::hook(token); - self.receivers.register2(oper, cx, &state); + self.receivers.register(oper, cx, &state); // Has the channel become ready just now? if !self.is_empty() || self.is_disconnected() { @@ -734,7 +734,18 @@ pub(crate) struct Receiver<'a, T>(&'a Channel); /// Sender handle to a channel. pub(crate) struct Sender<'a, T>(&'a Channel); +impl<'a, T> Receiver<'a, T> { + /// Same as `SelectHandle::start`, but with a more specific lifetime. + pub(crate) fn start_ref(&self) -> Option> { + Some(self.0.receivers.start()) + } +} + impl SelectHandle for Receiver<'_, T> { + fn start(&self) -> Option> { + self.start_ref() + } + fn try_select(&self, token: &mut Token) -> bool { self.0.start_recv(token) == Status::Ready } @@ -743,8 +754,9 @@ impl SelectHandle for Receiver<'_, T> { None } - fn register(&self, oper: Operation, cx: &Context) -> bool { - self.0.receivers.register(oper, cx); + fn register(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool { + let state = state.expect("Receiver::start returns blocking state"); + self.0.receivers.register(oper, cx, state); self.is_ready() } @@ -760,8 +772,9 @@ impl SelectHandle for Receiver<'_, T> { !self.0.is_empty() || self.0.is_disconnected() } - fn watch(&self, oper: Operation, cx: &Context) -> bool { - self.0.receivers.watch(oper, cx); + fn watch(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool { + let state = state.expect("Receiver::start returns blocking state"); + self.0.receivers.watch(oper, cx, state); self.is_ready() } @@ -770,7 +783,18 @@ impl SelectHandle for Receiver<'_, T> { } } -impl SelectHandle for Sender<'_, T> { +impl<'a, T> Sender<'a, T> { + /// Same as `SelectHandle::start`, but with a more specific lifetime. + pub(crate) fn start_ref(&self) -> Option> { + None + } +} + +impl<'a, T> SelectHandle for Sender<'a, T> { + fn start(&self) -> Option> { + None + } + fn try_select(&self, token: &mut Token) -> bool { self.0.start_send(token) } @@ -779,7 +803,12 @@ impl SelectHandle for Sender<'_, T> { None } - fn register(&self, _oper: Operation, _cx: &Context) -> bool { + fn register( + &self, + _oper: Operation, + _cx: &Context, + _state: Option<&BlockingState<'_>>, + ) -> bool { self.is_ready() } @@ -793,7 +822,7 @@ impl SelectHandle for Sender<'_, T> { true } - fn watch(&self, _oper: Operation, _cx: &Context) -> bool { + fn watch(&self, _oper: Operation, _cx: &Context, _state: Option<&BlockingState<'_>>) -> bool { self.is_ready() } diff --git a/crossbeam-channel/src/flavors/never.rs b/crossbeam-channel/src/flavors/never.rs index 7a9f830ac..0a985c548 100644 --- a/crossbeam-channel/src/flavors/never.rs +++ b/crossbeam-channel/src/flavors/never.rs @@ -9,6 +9,7 @@ use crate::context::Context; use crate::err::{RecvTimeoutError, TryRecvError}; use crate::select::{Operation, SelectHandle, Token}; use crate::utils; +use crate::waker::BlockingState; /// This flavor doesn't need a token. pub(crate) type NeverToken = (); @@ -72,6 +73,10 @@ impl Channel { } impl SelectHandle for Channel { + fn start(&self) -> Option> { + None + } + #[inline] fn try_select(&self, _token: &mut Token) -> bool { false @@ -83,7 +88,12 @@ impl SelectHandle for Channel { } #[inline] - fn register(&self, _oper: Operation, _cx: &Context) -> bool { + fn register( + &self, + _oper: Operation, + _cx: &Context, + _state: Option<&BlockingState<'_>>, + ) -> bool { self.is_ready() } @@ -101,7 +111,7 @@ impl SelectHandle for Channel { } #[inline] - fn watch(&self, _oper: Operation, _cx: &Context) -> bool { + fn watch(&self, _oper: Operation, _cx: &Context, _state: Option<&BlockingState<'_>>) -> bool { self.is_ready() } diff --git a/crossbeam-channel/src/flavors/tick.rs b/crossbeam-channel/src/flavors/tick.rs index a5b67ed9e..e9feacfee 100644 --- a/crossbeam-channel/src/flavors/tick.rs +++ b/crossbeam-channel/src/flavors/tick.rs @@ -10,6 +10,7 @@ use crossbeam_utils::atomic::AtomicCell; use crate::context::Context; use crate::err::{RecvTimeoutError, TryRecvError}; use crate::select::{Operation, SelectHandle, Token}; +use crate::waker::BlockingState; /// Result of a receive operation. pub(crate) type TickToken = Option; @@ -115,6 +116,10 @@ impl Channel { } impl SelectHandle for Channel { + fn start(&self) -> Option> { + None + } + #[inline] fn try_select(&self, token: &mut Token) -> bool { match self.try_recv() { @@ -136,7 +141,12 @@ impl SelectHandle for Channel { } #[inline] - fn register(&self, _oper: Operation, _cx: &Context) -> bool { + fn register( + &self, + _oper: Operation, + _cx: &Context, + _state: Option<&BlockingState<'_>>, + ) -> bool { self.is_ready() } @@ -154,7 +164,7 @@ impl SelectHandle for Channel { } #[inline] - fn watch(&self, _oper: Operation, _cx: &Context) -> bool { + fn watch(&self, _oper: Operation, _cx: &Context, _state: Option<&BlockingState<'_>>) -> bool { self.is_ready() } diff --git a/crossbeam-channel/src/flavors/zero.rs b/crossbeam-channel/src/flavors/zero.rs index 08d226f87..dfa9f089f 100644 --- a/crossbeam-channel/src/flavors/zero.rs +++ b/crossbeam-channel/src/flavors/zero.rs @@ -15,7 +15,7 @@ use crossbeam_utils::Backoff; use crate::context::Context; use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; use crate::select::{Operation, SelectHandle, Selected, Token}; -use crate::waker::Waker; +use crate::waker::{BlockingState, Waker}; /// A pointer to a packet. pub(crate) struct ZeroToken(*mut ()); @@ -387,6 +387,10 @@ impl Channel { pub(crate) fn is_full(&self) -> bool { true } + + pub(crate) fn start(&self) -> Option> { + None + } } /// Receiver handle to a channel. @@ -396,6 +400,10 @@ pub(crate) struct Receiver<'a, T>(&'a Channel); pub(crate) struct Sender<'a, T>(&'a Channel); impl SelectHandle for Receiver<'_, T> { + fn start(&self) -> Option> { + None + } + fn try_select(&self, token: &mut Token) -> bool { self.0.start_recv(token) } @@ -404,7 +412,7 @@ impl SelectHandle for Receiver<'_, T> { None } - fn register(&self, oper: Operation, cx: &Context) -> bool { + fn register(&self, oper: Operation, cx: &Context, _state: Option<&BlockingState<'_>>) -> bool { let packet = Box::into_raw(Packet::::empty_on_heap()); let mut inner = self.0.inner.lock().unwrap(); @@ -433,7 +441,7 @@ impl SelectHandle for Receiver<'_, T> { inner.senders.can_select() || inner.is_disconnected } - fn watch(&self, oper: Operation, cx: &Context) -> bool { + fn watch(&self, oper: Operation, cx: &Context, _state: Option<&BlockingState<'_>>) -> bool { let mut inner = self.0.inner.lock().unwrap(); inner.receivers.watch(oper, cx); inner.senders.can_select() || inner.is_disconnected @@ -445,7 +453,11 @@ impl SelectHandle for Receiver<'_, T> { } } -impl SelectHandle for Sender<'_, T> { +impl<'a, T> SelectHandle for Sender<'a, T> { + fn start(&self) -> Option> { + None + } + fn try_select(&self, token: &mut Token) -> bool { self.0.start_send(token) } @@ -454,7 +466,7 @@ impl SelectHandle for Sender<'_, T> { None } - fn register(&self, oper: Operation, cx: &Context) -> bool { + fn register(&self, oper: Operation, cx: &Context, _state: Option<&BlockingState<'_>>) -> bool { let packet = Box::into_raw(Packet::::empty_on_heap()); let mut inner = self.0.inner.lock().unwrap(); @@ -483,7 +495,7 @@ impl SelectHandle for Sender<'_, T> { inner.receivers.can_select() || inner.is_disconnected } - fn watch(&self, oper: Operation, cx: &Context) -> bool { + fn watch(&self, oper: Operation, cx: &Context, _state: Option<&BlockingState<'_>>) -> bool { let mut inner = self.0.inner.lock().unwrap(); inner.senders.watch(oper, cx); inner.receivers.can_select() || inner.is_disconnected diff --git a/crossbeam-channel/src/lib.rs b/crossbeam-channel/src/lib.rs index 35876c160..7faaa732b 100644 --- a/crossbeam-channel/src/lib.rs +++ b/crossbeam-channel/src/lib.rs @@ -362,6 +362,7 @@ mod waker; #[cfg(feature = "std")] pub mod internal { pub use crate::select::{select, select_timeout, try_select, SelectHandle}; + pub use crate::waker::BlockingState; } #[cfg(feature = "std")] diff --git a/crossbeam-channel/src/select.rs b/crossbeam-channel/src/select.rs index ac9e408d3..52df580cc 100644 --- a/crossbeam-channel/src/select.rs +++ b/crossbeam-channel/src/select.rs @@ -15,6 +15,7 @@ use crate::err::{RecvError, SendError}; use crate::err::{SelectTimeoutError, TrySelectError}; use crate::flavors; use crate::utils; +use crate::waker::BlockingState; /// Temporary data that gets initialized during select or a blocking operation, and is consumed by /// `read` or `write`. @@ -98,6 +99,9 @@ impl From for usize { /// appropriate deadline for blocking, etc. // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. pub trait SelectHandle { + /// Returns a guard that manages the state of the operation. + fn start(&self) -> Option>; + /// Attempts to select an operation and returns `true` on success. fn try_select(&self, token: &mut Token) -> bool; @@ -105,7 +109,7 @@ pub trait SelectHandle { fn deadline(&self) -> Option; /// Registers an operation for execution and returns `true` if it is now ready. - fn register(&self, oper: Operation, cx: &Context) -> bool; + fn register(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool; /// Unregisters an operation for execution. fn unregister(&self, oper: Operation); @@ -117,13 +121,17 @@ pub trait SelectHandle { fn is_ready(&self) -> bool; /// Registers an operation for readiness notification and returns `true` if it is now ready. - fn watch(&self, oper: Operation, cx: &Context) -> bool; + fn watch(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool; /// Unregisters an operation for readiness notification. fn unwatch(&self, oper: Operation); } impl SelectHandle for &T { + fn start(&self) -> Option> { + (**self).start() + } + fn try_select(&self, token: &mut Token) -> bool { (**self).try_select(token) } @@ -132,8 +140,8 @@ impl SelectHandle for &T { (**self).deadline() } - fn register(&self, oper: Operation, cx: &Context) -> bool { - (**self).register(oper, cx) + fn register(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool { + (**self).register(oper, cx, state) } fn unregister(&self, oper: Operation) { @@ -148,8 +156,8 @@ impl SelectHandle for &T { (**self).is_ready() } - fn watch(&self, oper: Operation, cx: &Context) -> bool { - (**self).watch(oper, cx) + fn watch(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool { + (**self).watch(oper, cx, state) } fn unwatch(&self, oper: Operation) { @@ -157,6 +165,46 @@ impl SelectHandle for &T { } } +// A dummy handle used to initialize the handles array by the select! macro. +impl SelectHandle for () { + fn start(&self) -> Option> { + None + } + + fn try_select(&self, _token: &mut Token) -> bool { + false + } + + fn deadline(&self) -> Option { + None + } + + fn register( + &self, + _oper: Operation, + _cx: &Context, + _state: Option<&BlockingState<'_>>, + ) -> bool { + false + } + + fn unregister(&self, _oper: Operation) {} + + fn accept(&self, _token: &mut Token, _cx: &Context) -> bool { + false + } + + fn is_ready(&self) -> bool { + false + } + + fn watch(&self, _oper: Operation, _cx: &Context, _state: Option<&BlockingState<'_>>) -> bool { + false + } + + fn unwatch(&self, _oper: Operation) {} +} + /// Determines when a select operation should time out. #[derive(Clone, Copy, Eq, PartialEq)] enum Timeout { @@ -175,7 +223,12 @@ enum Timeout { /// Successful receive operations will have to be followed up by `channel::read()` and successful /// send operations by `channel::write()`. fn run_select( - handles: &mut [(&dyn SelectHandle, usize, *const u8)], + handles: &mut [( + &dyn SelectHandle, + Option>, + usize, + *const u8, + )], timeout: Timeout, ) -> Option<(Token, usize, *const u8)> { if handles.is_empty() { @@ -202,7 +255,7 @@ fn run_select( let mut token = Token::default(); // Try selecting one of the operations without blocking. - for &(handle, i, ptr) in handles.iter() { + for &(handle, _, i, ptr) in handles.iter() { if handle.try_select(&mut token) { return Some((token, i, ptr)); } @@ -220,11 +273,15 @@ fn run_select( } // Register all operations. - for (handle, i, _) in handles.iter_mut() { + for (handle, state, i, _) in handles.iter_mut() { registered_count += 1; // If registration returns `false`, that means the operation has just become ready. - if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) { + if handle.register( + Operation::hook::<&dyn SelectHandle>(handle), + cx, + state.as_ref(), + ) { // Try aborting select. sel = match cx.try_select(Selected::Aborted) { Ok(()) => { @@ -251,7 +308,7 @@ fn run_select( Timeout::Never => None, Timeout::At(when) => Some(when), }; - for &(handle, _, _) in handles.iter() { + for &(handle, _, _, _) in handles.iter() { if let Some(x) = handle.deadline() { deadline = deadline.map(|y| x.min(y)).or(Some(x)); } @@ -262,8 +319,12 @@ fn run_select( } // Unregister all registered operations. - for (handle, _, _) in handles.iter_mut().take(registered_count) { + for (handle, state, _, _) in handles.iter_mut().take(registered_count) { handle.unregister(Operation::hook::<&dyn SelectHandle>(handle)); + + if let Some(state) = state { + state.unpark(); + } } match sel { @@ -271,7 +332,7 @@ fn run_select( Selected::Aborted => { // If an operation became ready during registration, try selecting it. if let Some(index_ready) = index_ready { - for &(handle, i, ptr) in handles.iter() { + for &(handle, _, i, ptr) in handles.iter() { if i == index_ready && handle.try_select(&mut token) { return Some((i, ptr)); } @@ -281,7 +342,7 @@ fn run_select( Selected::Disconnected => {} Selected::Operation(_) => { // Find the selected operation. - for (handle, i, ptr) in handles.iter_mut() { + for (handle, _, i, ptr) in handles.iter_mut() { // Is this the selected operation? if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle)) { @@ -303,7 +364,7 @@ fn run_select( } // Try selecting one of the operations without blocking. - for &(handle, i, ptr) in handles.iter() { + for &(handle, _, i, ptr) in handles.iter() { if handle.try_select(&mut token) { return Some((token, i, ptr)); } @@ -323,7 +384,12 @@ fn run_select( /// Runs until one of the operations becomes ready, potentially blocking the current thread. fn run_ready( - handles: &mut [(&dyn SelectHandle, usize, *const u8)], + handles: &mut [( + &dyn SelectHandle, + Option>, + usize, + *const u8, + )], timeout: Timeout, ) -> Option { if handles.is_empty() { @@ -348,7 +414,7 @@ fn run_ready( let backoff = Backoff::new(); loop { // Check operations for readiness. - for &(handle, i, _) in handles.iter() { + for &(handle, _, i, _) in handles.iter() { if handle.is_ready() { return Some(i); } @@ -378,12 +444,12 @@ fn run_ready( let mut registered_count = 0; // Begin watching all operations. - for (handle, _, _) in handles.iter_mut() { + for (handle, state, _, _) in handles.iter_mut() { registered_count += 1; let oper = Operation::hook::<&dyn SelectHandle>(handle); // If registration returns `false`, that means the operation has just become ready. - if handle.watch(oper, cx) { + if handle.watch(oper, cx, state.as_ref()) { sel = match cx.try_select(Selected::Operation(oper)) { Ok(()) => Selected::Operation(oper), Err(s) => s, @@ -406,7 +472,7 @@ fn run_ready( Timeout::Never => None, Timeout::At(when) => Some(when), }; - for &(handle, _, _) in handles.iter() { + for &(handle, _, _, _) in handles.iter() { if let Some(x) = handle.deadline() { deadline = deadline.map(|y| x.min(y)).or(Some(x)); } @@ -417,8 +483,11 @@ fn run_ready( } // Unwatch all operations. - for (handle, _, _) in handles.iter_mut().take(registered_count) { + for (handle, state, _, _) in handles.iter_mut().take(registered_count) { handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle)); + if let Some(state) = state { + state.unpark(); + } } match sel { @@ -426,7 +495,7 @@ fn run_ready( Selected::Aborted => {} Selected::Disconnected => {} Selected::Operation(_) => { - for (handle, i, _) in handles.iter_mut() { + for (handle, _, i, _) in handles.iter_mut() { let oper = Operation::hook::<&dyn SelectHandle>(handle); if sel == Selected::Operation(oper) { return Some(*i); @@ -449,7 +518,12 @@ fn run_ready( // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. #[inline] pub fn try_select<'a>( - handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], + handles: &mut [( + &'a dyn SelectHandle, + Option>, + usize, + *const u8, + )], ) -> Result, TrySelectError> { match run_select(handles, Timeout::Now) { None => Err(TrySelectError), @@ -466,7 +540,12 @@ pub fn try_select<'a>( // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. #[inline] pub fn select<'a>( - handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], + handles: &mut [( + &'a dyn SelectHandle, + Option>, + usize, + *const u8, + )], ) -> SelectedOperation<'a> { if handles.is_empty() { panic!("no operations have been added to `Select`"); @@ -485,7 +564,12 @@ pub fn select<'a>( // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro. #[inline] pub fn select_timeout<'a>( - handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], + handles: &mut [( + &'a dyn SelectHandle, + Option>, + usize, + *const u8, + )], timeout: Duration, ) -> Result, SelectTimeoutError> { match Instant::now().checked_add(timeout) { @@ -497,7 +581,12 @@ pub fn select_timeout<'a>( /// Blocks until a given deadline, or until one of the operations becomes ready and selects it. #[inline] pub(crate) fn select_deadline<'a>( - handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], + handles: &mut [( + &'a dyn SelectHandle, + Option>, + usize, + *const u8, + )], deadline: Instant, ) -> Result, SelectTimeoutError> { match run_select(handles, Timeout::At(deadline)) { @@ -597,7 +686,12 @@ pub(crate) fn select_deadline<'a>( /// [`ready_timeout`]: Select::ready_timeout pub struct Select<'a> { /// A list of senders and receivers participating in selection. - handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>, + handles: Vec<( + &'a dyn SelectHandle, + Option>, + usize, + *const u8, + )>, /// The next index to assign to an operation. next_index: usize, @@ -643,7 +737,8 @@ impl<'a> Select<'a> { pub fn send(&mut self, s: &'a Sender) -> usize { let i = self.next_index; let ptr = s as *const Sender<_> as *const u8; - self.handles.push((s, i, ptr)); + let state = s.start(); + self.handles.push((s, state, i, ptr)); self.next_index += 1; i } @@ -665,7 +760,8 @@ impl<'a> Select<'a> { pub fn recv(&mut self, r: &'a Receiver) -> usize { let i = self.next_index; let ptr = r as *const Receiver<_> as *const u8; - self.handles.push((r, i, ptr)); + let state = r.start(); + self.handles.push((r, state, i, ptr)); self.next_index += 1; i } @@ -718,7 +814,7 @@ impl<'a> Select<'a> { .handles .iter() .enumerate() - .find(|(_, (_, i, _))| *i == index) + .find(|(_, (_, _, i, _))| *i == index) .expect("no operation with this index") .0; diff --git a/crossbeam-channel/src/select_macro.rs b/crossbeam-channel/src/select_macro.rs index 3b71e1e50..52b038c06 100644 --- a/crossbeam-channel/src/select_macro.rs +++ b/crossbeam-channel/src/select_macro.rs @@ -685,10 +685,16 @@ macro_rules! crossbeam_channel_internal { $default:tt ) => {{ const _LEN: usize = $crate::crossbeam_channel_internal!(@count ($($cases)*)); - let _handle: &dyn $crate::internal::SelectHandle = &$crate::never::<()>(); + + const _STATE: ( + &'static dyn $crate::internal::SelectHandle, + ::core::option::Option<$crate::internal::BlockingState<'static>>, + usize, + *const u8 + ) = (&(), None, 0, ::std::ptr::null()); #[allow(unused_mut)] - let mut _sel = [(_handle, 0, ::std::ptr::null()); _LEN]; + let mut _sel = [_STATE; _LEN]; $crate::crossbeam_channel_internal!( @add @@ -852,7 +858,7 @@ macro_rules! crossbeam_channel_internal { } unbind(_r) }; - $sel[$i] = ($var, $i, $var as *const $crate::Receiver<_> as *const u8); + $sel[$i] = ($var, $crate::internal::SelectHandle::start($var), $i, $var as *const $crate::Receiver<_> as *const u8); $crate::crossbeam_channel_internal!( @add @@ -884,7 +890,7 @@ macro_rules! crossbeam_channel_internal { } unbind(_s) }; - $sel[$i] = ($var, $i, $var as *const $crate::Sender<_> as *const u8); + $sel[$i] = ($var, $crate::internal::SelectHandle::start($var), $i, $var as *const $crate::Sender<_> as *const u8); $crate::crossbeam_channel_internal!( @add diff --git a/crossbeam-channel/src/waker.rs b/crossbeam-channel/src/waker.rs index 99e0960ee..8134227aa 100644 --- a/crossbeam-channel/src/waker.rs +++ b/crossbeam-channel/src/waker.rs @@ -233,9 +233,9 @@ impl SyncWaker { /// Registers an operation waiting to be ready. #[inline] - pub(crate) fn watch(&self, oper: Operation, cx: &Context) { + pub(crate) fn watch(&self, oper: Operation, cx: &Context, state: &BlockingState<'_>) { self.inner.lock().unwrap().watch(oper, cx); - self.state.park(false); + self.state.park(state.is_waker); } /// Unregisters an operation waiting to be ready. @@ -262,7 +262,8 @@ impl Drop for SyncWaker { /// A guard that manages the state of a blocking operation. #[derive(Clone)] -struct BlockingState<'a> { +#[allow(missing_debug_implementations)] +pub struct BlockingState<'a> { /// True if this thread is the waker thread, meaning it must /// try to notify waiters after it completes. is_waker: bool, @@ -276,12 +277,6 @@ impl BlockingState<'_> { pub(crate) fn unpark(&mut self) { self.is_waker = self.waker.state.unpark(); } - - /// Reset the state of an observer after waking up from parking. - #[inline] - pub(crate) fn unpark_observer(&mut self) { - self.waker.state.unpark_observer(); - } } impl Drop for BlockingState<'_> { @@ -316,7 +311,7 @@ impl WakerState { // if a notification is already set, the waker thread will take care // of further notifications. otherwise we have to notify if there are waiters - if ((state >> NOTIFIED) & (state & NOTIFIED)) > 0 { + if (state >> WAKER) > 0 && (state & NOTIFIED == 0) { return self .state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { @@ -342,14 +337,6 @@ impl WakerState { self.state.fetch_add(update, Ordering::SeqCst); } - /// Remove an observer from the waker state after it was unparked. - /// - /// Observers never become the waking thread because if there were waiters, they - /// are also woken up along with any observers. - fn unpark_observer(&self) { - self.state.fetch_sub(1_u32 << WAKER, Ordering::SeqCst); - } - /// Remove this waiter from the waker state after it was unparked. /// /// Returns `true` if this thread became the waking thread and must call `drop_waker` diff --git a/crossbeam-channel/tests/array.rs b/crossbeam-channel/tests/array.rs index 74bd30258..486f56a78 100644 --- a/crossbeam-channel/tests/array.rs +++ b/crossbeam-channel/tests/array.rs @@ -427,7 +427,6 @@ fn stress_oneshot() { } } -// TODO: failing sometimes #[test] fn stress_iter() { #[cfg(miri)] diff --git a/crossbeam-channel/tests/ready.rs b/crossbeam-channel/tests/ready.rs index ca84f869c..f2c9b0aad 100644 --- a/crossbeam-channel/tests/ready.rs +++ b/crossbeam-channel/tests/ready.rs @@ -516,7 +516,7 @@ fn stress_recv() { let mut sel = Select::new(); sel.recv(&r1); sel.recv(&r2); - match sel.ready() { + match self.ready() { 0 => assert_eq!(r1.try_recv(), Ok(i)), 1 => assert_eq!(r2.try_recv(), Ok(i)), _ => panic!(),