Skip to content

Commit

Permalink
Docs + Clippy + Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ecton committed Aug 11, 2024
1 parent d62fef4 commit 15e892f
Show file tree
Hide file tree
Showing 11 changed files with 453 additions and 286 deletions.
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,19 @@ flume = "0.11.0"
serde = { workspace = true }
tracing-subscriber = { workspace = true }

[lints.clippy]
[workspace.lints.clippy]
pedantic = { level = "warn", priority = -1 }
module_name_repetitions = "allow"
missing_errors_doc = "allow"
missing_panics_doc = "allow"

[lints.rust]
[workspace.lints.rust]
unsafe_code = "forbid"
missing_docs = "warn"

[lints]
workspace = true

[[test]]
name = "harness"
harness = false
Expand Down
3 changes: 3 additions & 0 deletions muse-channel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ edition = "2021"
[dependencies]
muse-lang = { workspace = true }
parking_lot = { workspace = true }

[lints]
workspace = true
202 changes: 158 additions & 44 deletions muse-channel/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
//! A multi-producer, multi-consumer channel of [`RootedValue`]s.
//!
//! This channel supports sending and receiving from blocking and async code,
//! and the channel types [`ValueSender`] and [`ValueReceiver`] can be passed
//! into the Muse runtime.
use std::{
collections::VecDeque,
fmt::Debug,
Expand All @@ -15,7 +21,7 @@ use muse_lang::{
refuse::{CollectionGuard, ContainsNoRefs},
runtime::{
list::List,
symbol::SymbolRef,
symbol::{StaticSymbol, SymbolRef},
value::{
CustomType, Dynamic, Rooted, RootedValue, RustFunction, RustType,
StaticRustFunctionTable, TypeRef, Value,
Expand Down Expand Up @@ -46,50 +52,95 @@ fn new_channel(limit: Option<usize>) -> (ValueSender, ValueReceiver) {
(ValueSender { data: data.clone() }, ValueReceiver { data })
}

/// Returns a new channel with no capacity restrictions.
///
/// This channel will panic in out-of-memory situations.
#[must_use]
pub fn unbounded() -> (ValueSender, ValueReceiver) {
new_channel(None)
}

/// Returns a new channel with a fixed capacity.
#[must_use]
pub fn bounded(limit: usize) -> (ValueSender, ValueReceiver) {
new_channel(Some(limit))
}

pub trait WithNewChannel {
fn with_new_channel(self, guard: &mut CollectionGuard<'_>) -> Self;
/// Registers a `new_channel` Muse function in `self`.
pub trait WithNewChannel: Sized {
/// Registers a `new_channel` Muse function in `self`.
#[must_use]
fn with_new_channel(self, guard: &mut CollectionGuard<'_>) -> Self {
self.with_new_channel_named(&NEW_CHANNEL, guard)
}
/// Registers [`new_channel_function()`] in `self` as `name`.
#[must_use]
fn with_new_channel_named(
self,
name: impl Into<SymbolRef>,
guard: &mut CollectionGuard<'_>,
) -> Self;
}

impl WithNewChannel for Vm {
fn with_new_channel(self, guard: &mut CollectionGuard<'_>) -> Self {
fn with_new_channel_named(
self,
name: impl Into<SymbolRef>,
guard: &mut CollectionGuard<'_>,
) -> Self {
let module = self.context(guard).root_module();
let _same_module = module.with_new_channel(guard);
let _same_module = module.with_new_channel_named(name, guard);
self
}
}

/// A symbol for the default name for the new channel function.
pub static NEW_CHANNEL: StaticSymbol = StaticSymbol::new("new_channel");

impl WithNewChannel for Dynamic<Module> {
fn with_new_channel(self, guard: &mut CollectionGuard<'_>) -> Self {
fn with_new_channel_named(
self,
name: impl Into<SymbolRef>,
guard: &mut CollectionGuard<'_>,
) -> Self {
if let Some(module) = self.as_rooted(guard) {
declare_new_channel_in(&module, guard);
declare_new_channel_in(name, &module, guard);
}
self
}
}

impl WithNewChannel for Rooted<Module> {
fn with_new_channel(self, guard: &mut CollectionGuard<'_>) -> Self {
declare_new_channel_in(&self, guard);
fn with_new_channel_named(
self,
name: impl Into<SymbolRef>,
guard: &mut CollectionGuard<'_>,
) -> Self {
declare_new_channel_in(name, &self, guard);
self
}
}

pub fn declare_new_channel_in(module: &Module, guard: &CollectionGuard<'_>) {
/// Declares [`new_channel_function()`] in `module` with `name`.
pub fn declare_new_channel_in(
name: impl Into<SymbolRef>,
module: &Module,
guard: &CollectionGuard<'_>,
) {
module.declare(
"new_channel",
name,
Access::Public,
Value::dynamic(new_channel_function(), guard),
);
}

/// Returns a function that creates a new channel.
///
/// When called with no arguments, this function returns an unbounded channel.
///
/// When called with one argument, it is converted to an integer and used as the
/// bounds of a bounded channel.
#[must_use]
pub fn new_channel_function() -> RustFunction {
RustFunction::new(|vm: &mut VmContext<'_, '_>, arity| {
let (sender, receiver) = match arity.0 {
Expand All @@ -112,43 +163,78 @@ pub fn new_channel_function() -> RustFunction {
})
}

/// A sender of [`RootedValue`]s to be received by one or more [`ValueReceiver`]s.
pub struct ValueSender {
data: Arc<ChannelData>,
}

impl ValueSender {
/// Pushes `value` to the queue for [`ValueReceiver`]s to read from.
///
/// If this sender is for a bounded channel and the channel is full, this
/// function will block the current thread until space is available or it is
/// disconnected. Use [`send_async(value).await`](Self::send_async) in async
/// code instead to ensure proper interaction with the async runtime.
///
/// # Errors
///
/// This function returns `value` if the channel is disconnected before
/// `value` can be pushed.
pub fn send(&self, value: RootedValue) -> Result<(), RootedValue> {
if self.is_disconnected() {
return Err(value);
}

match self.data.limit {
Some(limit) => {
match self.bounded_send(value, limit, |locked| {
if let Some(limit) = self.data.limit {
if let Err(TrySendError::Disconnected(value) | TrySendError::Full(value)) = self
.bounded_send(value, limit, |locked| {
self.data.value_read.wait(locked);
ControlFlow::Continue(())
}) {
Ok(()) => Ok(()),
Err(TrySendError::Disconnected(value) | TrySendError::Full(value)) => {
Err(value)
}
}
})
{
Err(value)
} else {
Ok(())
}
None => Ok(self.unbounded_send(value)),
} else {
self.unbounded_send(value);
Ok(())
}
}

/// Tries to push `value` to the queue for [`ValueReceiver`]s to read from.
///
/// This function is safe to be invoked from both async and non-async code.
///
/// # Errors
///
/// - [`TrySendError::Disconnected`]: All associated [`ValueReceiver`]s have
/// been dropped.
/// - [`TrySendError::Full`]: The value cannot be pushed due to the channel
/// being full.
pub fn try_send(&self, value: RootedValue) -> Result<(), TrySendError> {
if self.is_disconnected() {
return Err(TrySendError::Disconnected(value));
}

match self.data.limit {
Some(limit) => self.bounded_send(value, limit, |_| ControlFlow::Break(())),
None => Ok(self.unbounded_send(value)),
if let Some(limit) = self.data.limit {
self.bounded_send(value, limit, |_| ControlFlow::Break(()))
} else {
self.unbounded_send(value);
Ok(())
}
}

/// Pushes `value` to the queue for [`ValueReceiver`]s to read from.
///
/// If this sender is for a bounded channel and the channel is full, this
/// function will block the current task until space is available or it is
/// disconnected.
///
/// # Errors
///
/// This function returns `value` if the channel is disconnected before
/// `value` can be pushed.
pub fn send_async(&self, value: RootedValue) -> SendAsync<'_> {
SendAsync {
value: Some(value),
Expand Down Expand Up @@ -181,7 +267,7 @@ impl ValueSender {
}

self.finish_send(value, locked);
return Ok(());
Ok(())
}

fn finish_send(&self, value: RootedValue, mut locked: MutexGuard<'_, LockedData>) {
Expand All @@ -193,18 +279,26 @@ impl ValueSender {
self.data.value_sent.notify_all();
}

/// Returns the number of associated [`ValueReceiver`]s that have not been
/// dropped.
#[must_use]
pub fn receivers(&self) -> usize {
self.data.receivers.load(Ordering::Relaxed)
}

/// Returns true if all associated [`ValueReceiver`]s have been dropped.
#[must_use]
pub fn is_disconnected(&self) -> bool {
self.receivers() == 0
}
}

/// Errors returned from [`ValueSender::try_send`].
#[derive(Debug, Clone, PartialEq)]
pub enum TrySendError {
/// All associated [`ValueReceiver`]s have been dropped.
Disconnected(RootedValue),
/// The value cannot be pushed due to the channel being full.
Full(RootedValue),
}

Expand Down Expand Up @@ -299,11 +393,19 @@ impl Drop for ValueSender {
}
}

/// A receiver of [`RootedValue`]s that are sent by one or more
/// [`ValueSender`]s.
pub struct ValueReceiver {
data: Arc<ChannelData>,
}

impl ValueReceiver {
/// Reads the oldest [`RootedValue`] from the channel, blocking the current
/// thread until a value is received.
///
/// Returns `None` if all [`ValueSender`]s have been dropped before a value
/// is received.
#[must_use]
pub fn recv(&self) -> Option<RootedValue> {
let mut locked = self.data.locked.lock();
loop {
Expand All @@ -322,14 +424,24 @@ impl ValueReceiver {
}
}

/// Reads the oldest [`RootedValue`] from the channel, blocking the current
/// task until a value is received.
///
/// Returns `None` if all [`ValueSender`]s have been dropped before a value
/// is received.
pub fn recv_async(&self) -> RecvAsync<'_> {
RecvAsync(self)
}

/// Returns the number of associated [`ValueSender`]s that have not been
/// dropped.
#[must_use]
pub fn senders(&self) -> usize {
self.data.senders.load(Ordering::Relaxed)
}

/// Returns true if all associated [`ValueSender`]s have been dropped.
#[must_use]
pub fn is_disconnected(&self) -> bool {
self.senders() == 0
}
Expand Down Expand Up @@ -398,6 +510,8 @@ impl Drop for ValueReceiver {
}
}

/// A future that sends a [`RootedValue`] when awaited.
#[must_use = "Futures must be awaited to execute"]
pub struct SendAsync<'a> {
value: Option<RootedValue>,
sender: &'a ValueSender,
Expand All @@ -414,31 +528,31 @@ impl Future for SendAsync<'_> {
return Poll::Ready(Err(value));
}

match self.sender.data.limit {
Some(limit) => {
match self.sender.bounded_send(value, limit, |locked| {
let will_wake = locked.send_wakers.iter().any(|w| w.will_wake(cx.waker()));
if !will_wake {
locked.send_wakers.push(cx.waker().clone());
}
ControlFlow::Break(())
}) {
Ok(()) => Poll::Ready(Ok(())),
Err(TrySendError::Disconnected(value)) => Poll::Ready(Err(value)),
Err(TrySendError::Full(value)) => {
self.value = Some(value);
Poll::Pending
}
if let Some(limit) = self.sender.data.limit {
let send_result = self.sender.bounded_send(value, limit, |locked| {
let will_wake = locked.send_wakers.iter().any(|w| w.will_wake(cx.waker()));
if !will_wake {
locked.send_wakers.push(cx.waker().clone());
}
ControlFlow::Break(())
});
match send_result {
Ok(()) => Poll::Ready(Ok(())),
Err(TrySendError::Disconnected(value)) => Poll::Ready(Err(value)),
Err(TrySendError::Full(value)) => {
self.value = Some(value);
Poll::Pending
}
}
None => {
self.sender.unbounded_send(value);
Poll::Ready(Ok(()))
}
} else {
self.sender.unbounded_send(value);
Poll::Ready(Ok(()))
}
}
}

/// A future that receives a [`RootedValue`] when awaited.
#[must_use = "Futures must be awaited to execute"]
pub struct RecvAsync<'a>(&'a ValueReceiver);

impl Future for RecvAsync<'_> {
Expand Down
Loading

0 comments on commit 15e892f

Please sign in to comment.