diff --git a/examples/mpmc.rs b/examples/mpmc.rs new file mode 100644 index 0000000..2966966 --- /dev/null +++ b/examples/mpmc.rs @@ -0,0 +1,29 @@ +use whisk::{Channel, Stream}; + +fn main() { + let executor = pasts::Executor::default(); + let channel = Stream::from(Channel::new()); + for _ in 0..24 { + let channel = channel.clone(); + std::thread::spawn(|| { + pasts::Executor::default().spawn(async move { + println!("Sending..."); + channel.send(Some(1)).await; + let count = Stream::strong_count(&channel); + println!("Sent {count}"); + if count <= 2 { + channel.send(None).await; + } + }) + }); + } + executor.spawn(async move { + let mut c = 0; + while let Some(v) = channel.recv().await { + println!("Received one."); + c += v; + } + println!("Received all."); + assert_eq!(c, 24); + }); +} diff --git a/src/lib.rs b/src/lib.rs index e4a3fe3..8ee1bdd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -96,6 +96,10 @@ extern crate alloc; +mod list; +mod tcms; +mod wake; + use alloc::{ sync::{Arc, Weak}, vec::Vec, diff --git a/src/list.rs b/src/list.rs new file mode 100644 index 0000000..4ad12ea --- /dev/null +++ b/src/list.rs @@ -0,0 +1,235 @@ +//! List implementations + +#![allow(unsafe_code)] + +use alloc::{boxed::Box, vec::Vec}; +use core::mem::MaybeUninit; + +/// Dynamic list (grow or fixed) +pub(crate) enum DynList { + Grow(GrowList), + Fixed(FixedList), +} + +impl DynList { + pub(crate) fn merge( + self: &mut Box, + mut other: Box>, + ) { + use DynList::*; + match **self { + Grow(ref mut grow) => { + match *other { + Grow(ref mut l) => grow.0.append(&mut l.0), + Fixed(ref mut l) => { + grow.0.extend( + l.data[..l.size] + .iter() + .map(|x| unsafe { x.assume_init_read() }), + ); + // Don't drop items from other list + core::mem::forget(other); + } + } + } + Fixed(ref mut list) => { + match *other { + Grow(mut l) => { + l.0.splice( + ..0, + list.data[..list.size] + .iter() + .map(|x| unsafe { x.assume_init_read() }), + ); + let mut new = Grow(l); + core::mem::swap(&mut **self, &mut new); + // Don't drop items from this list + core::mem::forget(new); + } + Fixed(ref mut l) => { + if l.len() + list.len() > CAP { + let mut vec = Vec::new(); + vec.extend( + list.data[..list.size] + .iter() + .map(|x| unsafe { x.assume_init_read() }), + ); + vec.extend( + l.data[..l.size] + .iter() + .map(|x| unsafe { x.assume_init_read() }), + ); + let mut new = Grow(GrowList(vec)); + core::mem::swap(&mut **self, &mut new); + // Don't drop items from this list + core::mem::forget(new); + } else { + for item in l.data[..l.size].iter() { + self.push(unsafe { item.assume_init_read() }); + } + } + // Don't drop items from other list + core::mem::forget(other); + } + } + } + } + } +} + +impl Default for DynList { + fn default() -> Self { + Self::Fixed(FixedList::default()) + } +} + +pub(crate) struct GrowList(Vec); + +impl Default for GrowList { + fn default() -> Self { + Self(Vec::default()) + } +} + +pub(crate) struct FixedList { + size: usize, + data: [MaybeUninit; CAP], +} + +impl Default for FixedList { + fn default() -> Self { + let size = 0; + let data = uninit_array::(); + + Self { size, data } + } +} + +pub(crate) trait List { + fn push(&mut self, item: T); + fn pop(&mut self) -> Option; + fn len(&self) -> usize; + fn as_slice(&mut self) -> &mut [T]; +} + +impl List for DynList { + fn push(&mut self, item: T) { + match self { + DynList::Grow(ref mut list) => list.push(item), + DynList::Fixed(ref mut list) => { + if list.len() == CAP { + let mut vec = + Vec::from(unsafe { array_assume_init(&list.data) }); + vec.push(item); + *self = DynList::Grow(GrowList(vec)); + } else { + list.push(item); + } + } + } + } + + fn pop(&mut self) -> Option { + match self { + DynList::Grow(ref mut list) => list.pop(), + DynList::Fixed(ref mut list) => list.pop(), + } + } + + fn len(&self) -> usize { + match self { + DynList::Grow(ref list) => list.len(), + DynList::Fixed(ref list) => list.len(), + } + } + + fn as_slice(&mut self) -> &mut [T] { + match self { + DynList::Grow(ref mut list) => list.as_slice(), + DynList::Fixed(ref mut list) => list.as_slice(), + } + } +} + +impl List for GrowList { + fn push(&mut self, item: T) { + self.0.push(item); + } + + fn pop(&mut self) -> Option { + self.0.pop() + } + + fn len(&self) -> usize { + self.0.len() + } + + fn as_slice(&mut self) -> &mut [T] { + self.0.as_mut_slice() + } +} + +impl List for FixedList { + fn push(&mut self, item: T) { + assert_ne!(self.size, CAP); + self.data[self.size].write(item); + self.size += 1; + } + + fn pop(&mut self) -> Option { + if self.size == 0 { + None + } else { + self.size -= 1; + Some(unsafe { self.data[self.size].assume_init_read() }) + } + } + + fn len(&self) -> usize { + self.size + } + + fn as_slice(&mut self) -> &mut [T] { + unsafe { slice_assume_init_mut(&mut self.data[..self.size]) } + } +} + +impl Drop for FixedList { + fn drop(&mut self) { + for item in self.data[..self.size].iter_mut() { + unsafe { item.assume_init_drop() } + } + } +} + +/// Can be removed once https://github.com/rust-lang/rust/issues/96097 resolves +#[must_use] +#[inline(always)] +const fn uninit_array() -> [MaybeUninit; N] { + // SAFETY: An uninitialized `[MaybeUninit<_>; LEN]` is valid. + unsafe { MaybeUninit::<[MaybeUninit; N]>::uninit().assume_init() } +} + +#[inline(always)] +unsafe fn array_assume_init( + array: &[MaybeUninit; N], +) -> [T; N] { + // SAFETY: + // * The caller guarantees that all elements of the array are initialized + // * `MaybeUninit` and T are guaranteed to have the same layout + // * `MaybeUninit` does not drop, so there are no double-frees + // And thus the conversion is safe + let array: *const _ = array; + let array: *const [T; N] = array.cast(); + + array.read() +} + +#[inline(always)] +unsafe fn slice_assume_init_mut(slice: &mut [MaybeUninit]) -> &mut [T] { + // SAFETY: similar to safety notes for `slice_get_ref`, but we have a + // mutable reference which is also guaranteed to be valid for writes. + let slice: *mut _ = slice; + + &mut *(slice as *mut [T]) +} diff --git a/src/tcms.rs b/src/tcms.rs new file mode 100644 index 0000000..4d54f7b --- /dev/null +++ b/src/tcms.rs @@ -0,0 +1,79 @@ +//! Lockless synchronization + +#![allow(unsafe_code)] + +use alloc::boxed::Box; +use core::{ + ptr, + sync::atomic::{AtomicPtr, Ordering}, +}; + +/// Take-create-merge-swap +/// +/// Essentially the opposite of RCU (read-copy-update), optimized for writing +/// rather than reading. +/// +/// - Task 1 takes the inner value +/// - Task 2 sees Task 1 has ownership of value +/// - Task 2 creates a new empty/default value +/// - Task 2 writes to new value +/// - Task 1 returns value +/// - Task 2 checks if value has been returned and swaps if not +/// - Task 2 takes ownership of other value if returned and merges then returns +/// +/// One thing to keep in mind when using this type is that not all values will +/// be available at all times. +pub(crate) struct Tcms(AtomicPtr); + +impl Tcms { + /// Create new TCMS + pub(crate) fn new() -> Self { + Self(AtomicPtr::new(Box::into_raw(Box::new(T::default())))) + } + + /// Run `f` with merger `m`. + /// + /// Merger is unstable, can't expect order to be preserved + pub(crate) fn with( + &self, + f: impl FnOnce(&mut T) -> R, + m: impl Fn(&mut Box, Box), + ) -> R { + // Swap with null pointer + let list = self.0.swap(ptr::null_mut(), Ordering::Acquire); + let mut list = if list.is_null() { + Box::new(T::default()) + } else { + unsafe { Box::from_raw(list) } + }; + + // Run closure with list + let r = f(&mut *list); + + // Merge lists if needed + let mut new = Box::into_raw(list); + while self + .0 + .compare_exchange( + core::ptr::null_mut(), + new, + Ordering::Release, + Ordering::Relaxed, + ) + .is_err() + { + let other = self.0.swap(ptr::null_mut(), Ordering::Acquire); + if !other.is_null() { + let mut a = unsafe { Box::from_raw(new) }; + let b = unsafe { Box::from_raw(other) }; + m(&mut a, b); + new = Box::into_raw(a); + } else { + // Too much contention with other task, try again + core::hint::spin_loop(); + }; + } + + r + } +} diff --git a/src/wake.rs b/src/wake.rs new file mode 100644 index 0000000..1973a4f --- /dev/null +++ b/src/wake.rs @@ -0,0 +1,99 @@ +//! WakeList implementation + +use alloc::boxed::Box; +use core::{ + sync::atomic::{AtomicUsize, Ordering}, + task::Waker, +}; + +use crate::{ + list::{DynList, List}, + tcms::Tcms, +}; + +pub(crate) struct RecvHandle(usize); + +pub(crate) struct SendHandle(usize); + +/// Lockless MPMC multi-waker +pub(crate) struct WakeList { + /// List of wakers, with up to 2 on the stack before falling back to heap + send: Tcms), 2>>, + /// List of wakers, with up to 2 on the stack before falling back to heap + recv: Tcms), 2>>, + /// List of garbage, with up to 2 on the stack before falling back to heap + garbage: Tcms>, + /// Next slot + slot: AtomicUsize, +} + +impl WakeList { + /// Create a new lockless multi-waker + pub(crate) fn new() -> Self { + let send = Tcms::new(); + let recv = Tcms::new(); + let garbage = Tcms::new(); + let slot = AtomicUsize::new(0); + + Self { + send, + recv, + garbage, + slot, + } + } + + /// Set waker + fn when_recv(&self, waker: Waker) -> RecvHandle { + let id = self + .garbage + .with(|g| g.pop(), merge) + .unwrap_or_else(|| self.slot.fetch_add(1, Ordering::Relaxed)); + self.recv.with(|list| list.push((id, Some(waker))), merge); + RecvHandle(id) + } + + /// Set waker + fn when_send(&self, waker: Waker) -> SendHandle { + let id = self + .garbage + .with(|g| g.pop(), merge) + .unwrap_or_else(|| self.slot.fetch_add(1, Ordering::Relaxed)); + self.send.with(|list| list.push((id, Some(waker))), merge); + SendHandle(id) + } + + /// Free a send handle to be reused + fn begin_free_send(&self, _handle: SendHandle) { + todo!() + } + + /// Free a recv handle to be reused + fn begin_free_recv(&self, _handle: RecvHandle) { + todo!() + } + + /// Wake one waker + fn begin_wake_one_send(&self) { + todo!() + } + + /// Wake all wakers + fn begin_wake_all_send(&self) { + todo!() + } + + /// Wake one waker + fn begin_wake_one_recv(&self) { + todo!() + } + + /// Wake all wakers + fn begin_wake_all_recv(&self) { + todo!() + } +} + +fn merge(orig: &mut Box>, other: Box>) { + orig.merge(other) +}