Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lockless WakeList #28

Draft
wants to merge 2 commits into
base: stable
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions examples/mpmc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use whisk::{Channel, Stream};

fn main() {
let executor = pasts::Executor::default();
let channel = Stream::from(Channel::new());
for _ in 0..24 {
let channel = channel.clone();
std::thread::spawn(|| {
pasts::Executor::default().spawn(async move {
println!("Sending...");
channel.send(Some(1)).await;
let count = Stream::strong_count(&channel);
println!("Sent {count}");
if count <= 2 {
channel.send(None).await;
}
})
});
}
executor.spawn(async move {
let mut c = 0;
while let Some(v) = channel.recv().await {
println!("Received one.");
c += v;
}
println!("Received all.");
assert_eq!(c, 24);
});
}
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@

extern crate alloc;

mod list;
mod tcms;
mod wake;

use alloc::{
sync::{Arc, Weak},
vec::Vec,
Expand Down
235 changes: 235 additions & 0 deletions src/list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
//! List implementations

#![allow(unsafe_code)]

use alloc::{boxed::Box, vec::Vec};
use core::mem::MaybeUninit;

/// Dynamic list (grow or fixed)
pub(crate) enum DynList<T, const CAP: usize> {
Grow(GrowList<T>),
Fixed(FixedList<T, CAP>),
}

impl<T, const CAP: usize> DynList<T, CAP> {
pub(crate) fn merge<const PAC: usize>(
self: &mut Box<Self>,
mut other: Box<DynList<T, PAC>>,
) {
use DynList::*;
match **self {
Grow(ref mut grow) => {
match *other {
Grow(ref mut l) => grow.0.append(&mut l.0),
Fixed(ref mut l) => {
grow.0.extend(
l.data[..l.size]
.iter()
.map(|x| unsafe { x.assume_init_read() }),
);
// Don't drop items from other list
core::mem::forget(other);
}
}
}
Fixed(ref mut list) => {
match *other {
Grow(mut l) => {
l.0.splice(
..0,
list.data[..list.size]
.iter()
.map(|x| unsafe { x.assume_init_read() }),
);
let mut new = Grow(l);
core::mem::swap(&mut **self, &mut new);
// Don't drop items from this list
core::mem::forget(new);
}
Fixed(ref mut l) => {
if l.len() + list.len() > CAP {
let mut vec = Vec::new();
vec.extend(
list.data[..list.size]
.iter()
.map(|x| unsafe { x.assume_init_read() }),
);
vec.extend(
l.data[..l.size]
.iter()
.map(|x| unsafe { x.assume_init_read() }),
);
let mut new = Grow(GrowList(vec));
core::mem::swap(&mut **self, &mut new);
// Don't drop items from this list
core::mem::forget(new);
} else {
for item in l.data[..l.size].iter() {
self.push(unsafe { item.assume_init_read() });
}
}
// Don't drop items from other list
core::mem::forget(other);
}
}
}
}
}
}

impl<T, const CAP: usize> Default for DynList<T, CAP> {
fn default() -> Self {
Self::Fixed(FixedList::default())
}
}

pub(crate) struct GrowList<T>(Vec<T>);

impl<T> Default for GrowList<T> {
fn default() -> Self {
Self(Vec::default())
}
}

pub(crate) struct FixedList<T, const CAP: usize> {
size: usize,
data: [MaybeUninit<T>; CAP],
}

impl<T, const CAP: usize> Default for FixedList<T, CAP> {
fn default() -> Self {
let size = 0;
let data = uninit_array::<T, CAP>();

Self { size, data }
}
}

pub(crate) trait List<T> {
fn push(&mut self, item: T);
fn pop(&mut self) -> Option<T>;
fn len(&self) -> usize;
fn as_slice(&mut self) -> &mut [T];
}

impl<T, const CAP: usize> List<T> for DynList<T, CAP> {
fn push(&mut self, item: T) {
match self {
DynList::Grow(ref mut list) => list.push(item),
DynList::Fixed(ref mut list) => {
if list.len() == CAP {
let mut vec =
Vec::from(unsafe { array_assume_init(&list.data) });
vec.push(item);
*self = DynList::Grow(GrowList(vec));
} else {
list.push(item);
}
}
}
}

fn pop(&mut self) -> Option<T> {
match self {
DynList::Grow(ref mut list) => list.pop(),
DynList::Fixed(ref mut list) => list.pop(),
}
}

fn len(&self) -> usize {
match self {
DynList::Grow(ref list) => list.len(),
DynList::Fixed(ref list) => list.len(),
}
}

fn as_slice(&mut self) -> &mut [T] {
match self {
DynList::Grow(ref mut list) => list.as_slice(),
DynList::Fixed(ref mut list) => list.as_slice(),
}
}
}

impl<T> List<T> for GrowList<T> {
fn push(&mut self, item: T) {
self.0.push(item);
}

fn pop(&mut self) -> Option<T> {
self.0.pop()
}

fn len(&self) -> usize {
self.0.len()
}

fn as_slice(&mut self) -> &mut [T] {
self.0.as_mut_slice()
}
}

impl<T, const CAP: usize> List<T> for FixedList<T, CAP> {
fn push(&mut self, item: T) {
assert_ne!(self.size, CAP);
self.data[self.size].write(item);
self.size += 1;
}

fn pop(&mut self) -> Option<T> {
if self.size == 0 {
None
} else {
self.size -= 1;
Some(unsafe { self.data[self.size].assume_init_read() })
}
}

fn len(&self) -> usize {
self.size
}

fn as_slice(&mut self) -> &mut [T] {
unsafe { slice_assume_init_mut(&mut self.data[..self.size]) }
}
}

impl<T, const CAP: usize> Drop for FixedList<T, CAP> {
fn drop(&mut self) {
for item in self.data[..self.size].iter_mut() {
unsafe { item.assume_init_drop() }
}
}
}

/// Can be removed once https://github.com/rust-lang/rust/issues/96097 resolves
#[must_use]
#[inline(always)]
const fn uninit_array<T, const N: usize>() -> [MaybeUninit<T>; N] {
// SAFETY: An uninitialized `[MaybeUninit<_>; LEN]` is valid.
unsafe { MaybeUninit::<[MaybeUninit<T>; N]>::uninit().assume_init() }
}

#[inline(always)]
unsafe fn array_assume_init<T, const N: usize>(
array: &[MaybeUninit<T>; N],
) -> [T; N] {
// SAFETY:
// * The caller guarantees that all elements of the array are initialized
// * `MaybeUninit<T>` and T are guaranteed to have the same layout
// * `MaybeUninit` does not drop, so there are no double-frees
// And thus the conversion is safe
let array: *const _ = array;
let array: *const [T; N] = array.cast();

array.read()
}

#[inline(always)]
unsafe fn slice_assume_init_mut<T>(slice: &mut [MaybeUninit<T>]) -> &mut [T] {
// SAFETY: similar to safety notes for `slice_get_ref`, but we have a
// mutable reference which is also guaranteed to be valid for writes.
let slice: *mut _ = slice;

&mut *(slice as *mut [T])
}
79 changes: 79 additions & 0 deletions src/tcms.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//! Lockless synchronization

#![allow(unsafe_code)]

use alloc::boxed::Box;
use core::{
ptr,
sync::atomic::{AtomicPtr, Ordering},
};

/// Take-create-merge-swap
///
/// Essentially the opposite of RCU (read-copy-update), optimized for writing
/// rather than reading.
///
/// - Task 1 takes the inner value
/// - Task 2 sees Task 1 has ownership of value
/// - Task 2 creates a new empty/default value
/// - Task 2 writes to new value
/// - Task 1 returns value
/// - Task 2 checks if value has been returned and swaps if not
/// - Task 2 takes ownership of other value if returned and merges then returns
///
/// One thing to keep in mind when using this type is that not all values will
/// be available at all times.
pub(crate) struct Tcms<T: Default>(AtomicPtr<T>);

impl<T: Default> Tcms<T> {
/// Create new TCMS
pub(crate) fn new() -> Self {
Self(AtomicPtr::new(Box::into_raw(Box::new(T::default()))))
}

/// Run `f` with merger `m`.
///
/// Merger is unstable, can't expect order to be preserved
pub(crate) fn with<R>(
&self,
f: impl FnOnce(&mut T) -> R,
m: impl Fn(&mut Box<T>, Box<T>),
) -> R {
// Swap with null pointer
let list = self.0.swap(ptr::null_mut(), Ordering::Acquire);
let mut list = if list.is_null() {
Box::new(T::default())
} else {
unsafe { Box::from_raw(list) }
};

// Run closure with list
let r = f(&mut *list);

// Merge lists if needed
let mut new = Box::into_raw(list);
while self
.0
.compare_exchange(
core::ptr::null_mut(),
new,
Ordering::Release,
Ordering::Relaxed,
)
.is_err()
{
let other = self.0.swap(ptr::null_mut(), Ordering::Acquire);
if !other.is_null() {
let mut a = unsafe { Box::from_raw(new) };
let b = unsafe { Box::from_raw(other) };
m(&mut a, b);
new = Box::into_raw(a);
} else {
// Too much contention with other task, try again
core::hint::spin_loop();
};
}

r
}
}
Loading