diff --git a/Cargo.toml b/Cargo.toml index 173c086..a66f304 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,16 +57,19 @@ flume = "0.11.0" serde = { workspace = true } tracing-subscriber = { workspace = true } -[lints.clippy] +[workspace.lints.clippy] pedantic = { level = "warn", priority = -1 } module_name_repetitions = "allow" missing_errors_doc = "allow" missing_panics_doc = "allow" -[lints.rust] +[workspace.lints.rust] unsafe_code = "forbid" missing_docs = "warn" +[lints] +workspace = true + [[test]] name = "harness" harness = false diff --git a/muse-channel/Cargo.toml b/muse-channel/Cargo.toml index 68ccb20..9bbe32d 100644 --- a/muse-channel/Cargo.toml +++ b/muse-channel/Cargo.toml @@ -6,3 +6,6 @@ edition = "2021" [dependencies] muse-lang = { workspace = true } parking_lot = { workspace = true } + +[lints] +workspace = true diff --git a/muse-channel/src/lib.rs b/muse-channel/src/lib.rs index 5872961..5e5a93e 100644 --- a/muse-channel/src/lib.rs +++ b/muse-channel/src/lib.rs @@ -1,3 +1,9 @@ +//! A multi-producer, multi-consumer channel of [`RootedValue`]s. +//! +//! This channel supports sending and receiving from blocking and async code, +//! and the channel types [`ValueSender`] and [`ValueReceiver`] can be passed +//! into the Muse runtime. + use std::{ collections::VecDeque, fmt::Debug, @@ -15,7 +21,7 @@ use muse_lang::{ refuse::{CollectionGuard, ContainsNoRefs}, runtime::{ list::List, - symbol::SymbolRef, + symbol::{StaticSymbol, SymbolRef}, value::{ CustomType, Dynamic, Rooted, RootedValue, RustFunction, RustType, StaticRustFunctionTable, TypeRef, Value, @@ -46,50 +52,95 @@ fn new_channel(limit: Option) -> (ValueSender, ValueReceiver) { (ValueSender { data: data.clone() }, ValueReceiver { data }) } +/// Returns a new channel with no capacity restrictions. +/// +/// This channel will panic in out-of-memory situations. +#[must_use] pub fn unbounded() -> (ValueSender, ValueReceiver) { new_channel(None) } +/// Returns a new channel with a fixed capacity. +#[must_use] pub fn bounded(limit: usize) -> (ValueSender, ValueReceiver) { new_channel(Some(limit)) } -pub trait WithNewChannel { - fn with_new_channel(self, guard: &mut CollectionGuard<'_>) -> Self; +/// Registers a `new_channel` Muse function in `self`. +pub trait WithNewChannel: Sized { + /// Registers a `new_channel` Muse function in `self`. + #[must_use] + fn with_new_channel(self, guard: &mut CollectionGuard<'_>) -> Self { + self.with_new_channel_named(&NEW_CHANNEL, guard) + } + /// Registers [`new_channel_function()`] in `self` as `name`. + #[must_use] + fn with_new_channel_named( + self, + name: impl Into, + guard: &mut CollectionGuard<'_>, + ) -> Self; } impl WithNewChannel for Vm { - fn with_new_channel(self, guard: &mut CollectionGuard<'_>) -> Self { + fn with_new_channel_named( + self, + name: impl Into, + guard: &mut CollectionGuard<'_>, + ) -> Self { let module = self.context(guard).root_module(); - let _same_module = module.with_new_channel(guard); + let _same_module = module.with_new_channel_named(name, guard); self } } +/// A symbol for the default name for the new channel function. +pub static NEW_CHANNEL: StaticSymbol = StaticSymbol::new("new_channel"); + impl WithNewChannel for Dynamic { - fn with_new_channel(self, guard: &mut CollectionGuard<'_>) -> Self { + fn with_new_channel_named( + self, + name: impl Into, + guard: &mut CollectionGuard<'_>, + ) -> Self { if let Some(module) = self.as_rooted(guard) { - declare_new_channel_in(&module, guard); + declare_new_channel_in(name, &module, guard); } self } } impl WithNewChannel for Rooted { - fn with_new_channel(self, guard: &mut CollectionGuard<'_>) -> Self { - declare_new_channel_in(&self, guard); + fn with_new_channel_named( + self, + name: impl Into, + guard: &mut CollectionGuard<'_>, + ) -> Self { + declare_new_channel_in(name, &self, guard); self } } -pub fn declare_new_channel_in(module: &Module, guard: &CollectionGuard<'_>) { +/// Declares [`new_channel_function()`] in `module` with `name`. +pub fn declare_new_channel_in( + name: impl Into, + module: &Module, + guard: &CollectionGuard<'_>, +) { module.declare( - "new_channel", + name, Access::Public, Value::dynamic(new_channel_function(), guard), ); } +/// Returns a function that creates a new channel. +/// +/// When called with no arguments, this function returns an unbounded channel. +/// +/// When called with one argument, it is converted to an integer and used as the +/// bounds of a bounded channel. +#[must_use] pub fn new_channel_function() -> RustFunction { RustFunction::new(|vm: &mut VmContext<'_, '_>, arity| { let (sender, receiver) = match arity.0 { @@ -112,43 +163,78 @@ pub fn new_channel_function() -> RustFunction { }) } +/// A sender of [`RootedValue`]s to be received by one or more [`ValueReceiver`]s. pub struct ValueSender { data: Arc, } impl ValueSender { + /// Pushes `value` to the queue for [`ValueReceiver`]s to read from. + /// + /// If this sender is for a bounded channel and the channel is full, this + /// function will block the current thread until space is available or it is + /// disconnected. Use [`send_async(value).await`](Self::send_async) in async + /// code instead to ensure proper interaction with the async runtime. + /// + /// # Errors + /// + /// This function returns `value` if the channel is disconnected before + /// `value` can be pushed. pub fn send(&self, value: RootedValue) -> Result<(), RootedValue> { if self.is_disconnected() { return Err(value); } - match self.data.limit { - Some(limit) => { - match self.bounded_send(value, limit, |locked| { + if let Some(limit) = self.data.limit { + if let Err(TrySendError::Disconnected(value) | TrySendError::Full(value)) = self + .bounded_send(value, limit, |locked| { self.data.value_read.wait(locked); ControlFlow::Continue(()) - }) { - Ok(()) => Ok(()), - Err(TrySendError::Disconnected(value) | TrySendError::Full(value)) => { - Err(value) - } - } + }) + { + Err(value) + } else { + Ok(()) } - None => Ok(self.unbounded_send(value)), + } else { + self.unbounded_send(value); + Ok(()) } } + /// Tries to push `value` to the queue for [`ValueReceiver`]s to read from. + /// + /// This function is safe to be invoked from both async and non-async code. + /// + /// # Errors + /// + /// - [`TrySendError::Disconnected`]: All associated [`ValueReceiver`]s have + /// been dropped. + /// - [`TrySendError::Full`]: The value cannot be pushed due to the channel + /// being full. pub fn try_send(&self, value: RootedValue) -> Result<(), TrySendError> { if self.is_disconnected() { return Err(TrySendError::Disconnected(value)); } - match self.data.limit { - Some(limit) => self.bounded_send(value, limit, |_| ControlFlow::Break(())), - None => Ok(self.unbounded_send(value)), + if let Some(limit) = self.data.limit { + self.bounded_send(value, limit, |_| ControlFlow::Break(())) + } else { + self.unbounded_send(value); + Ok(()) } } + /// Pushes `value` to the queue for [`ValueReceiver`]s to read from. + /// + /// If this sender is for a bounded channel and the channel is full, this + /// function will block the current task until space is available or it is + /// disconnected. + /// + /// # Errors + /// + /// This function returns `value` if the channel is disconnected before + /// `value` can be pushed. pub fn send_async(&self, value: RootedValue) -> SendAsync<'_> { SendAsync { value: Some(value), @@ -181,7 +267,7 @@ impl ValueSender { } self.finish_send(value, locked); - return Ok(()); + Ok(()) } fn finish_send(&self, value: RootedValue, mut locked: MutexGuard<'_, LockedData>) { @@ -193,18 +279,26 @@ impl ValueSender { self.data.value_sent.notify_all(); } + /// Returns the number of associated [`ValueReceiver`]s that have not been + /// dropped. + #[must_use] pub fn receivers(&self) -> usize { self.data.receivers.load(Ordering::Relaxed) } + /// Returns true if all associated [`ValueReceiver`]s have been dropped. + #[must_use] pub fn is_disconnected(&self) -> bool { self.receivers() == 0 } } +/// Errors returned from [`ValueSender::try_send`]. #[derive(Debug, Clone, PartialEq)] pub enum TrySendError { + /// All associated [`ValueReceiver`]s have been dropped. Disconnected(RootedValue), + /// The value cannot be pushed due to the channel being full. Full(RootedValue), } @@ -299,11 +393,19 @@ impl Drop for ValueSender { } } +/// A receiver of [`RootedValue`]s that are sent by one or more +/// [`ValueSender`]s. pub struct ValueReceiver { data: Arc, } impl ValueReceiver { + /// Reads the oldest [`RootedValue`] from the channel, blocking the current + /// thread until a value is received. + /// + /// Returns `None` if all [`ValueSender`]s have been dropped before a value + /// is received. + #[must_use] pub fn recv(&self) -> Option { let mut locked = self.data.locked.lock(); loop { @@ -322,14 +424,24 @@ impl ValueReceiver { } } + /// Reads the oldest [`RootedValue`] from the channel, blocking the current + /// task until a value is received. + /// + /// Returns `None` if all [`ValueSender`]s have been dropped before a value + /// is received. pub fn recv_async(&self) -> RecvAsync<'_> { RecvAsync(self) } + /// Returns the number of associated [`ValueSender`]s that have not been + /// dropped. + #[must_use] pub fn senders(&self) -> usize { self.data.senders.load(Ordering::Relaxed) } + /// Returns true if all associated [`ValueSender`]s have been dropped. + #[must_use] pub fn is_disconnected(&self) -> bool { self.senders() == 0 } @@ -398,6 +510,8 @@ impl Drop for ValueReceiver { } } +/// A future that sends a [`RootedValue`] when awaited. +#[must_use = "Futures must be awaited to execute"] pub struct SendAsync<'a> { value: Option, sender: &'a ValueSender, @@ -414,31 +528,31 @@ impl Future for SendAsync<'_> { return Poll::Ready(Err(value)); } - match self.sender.data.limit { - Some(limit) => { - match self.sender.bounded_send(value, limit, |locked| { - let will_wake = locked.send_wakers.iter().any(|w| w.will_wake(cx.waker())); - if !will_wake { - locked.send_wakers.push(cx.waker().clone()); - } - ControlFlow::Break(()) - }) { - Ok(()) => Poll::Ready(Ok(())), - Err(TrySendError::Disconnected(value)) => Poll::Ready(Err(value)), - Err(TrySendError::Full(value)) => { - self.value = Some(value); - Poll::Pending - } + if let Some(limit) = self.sender.data.limit { + let send_result = self.sender.bounded_send(value, limit, |locked| { + let will_wake = locked.send_wakers.iter().any(|w| w.will_wake(cx.waker())); + if !will_wake { + locked.send_wakers.push(cx.waker().clone()); + } + ControlFlow::Break(()) + }); + match send_result { + Ok(()) => Poll::Ready(Ok(())), + Err(TrySendError::Disconnected(value)) => Poll::Ready(Err(value)), + Err(TrySendError::Full(value)) => { + self.value = Some(value); + Poll::Pending } } - None => { - self.sender.unbounded_send(value); - Poll::Ready(Ok(())) - } + } else { + self.sender.unbounded_send(value); + Poll::Ready(Ok(())) } } } +/// A future that receives a [`RootedValue`] when awaited. +#[must_use = "Futures must be awaited to execute"] pub struct RecvAsync<'a>(&'a ValueReceiver); impl Future for RecvAsync<'_> { diff --git a/muse-lang/src/runtime/string.rs b/muse-lang/src/runtime/string.rs index 76d53cb..162c932 100644 --- a/muse-lang/src/runtime/string.rs +++ b/muse-lang/src/runtime/string.rs @@ -126,7 +126,7 @@ pub static STRING_TYPE: RustType = RustType::new("String", |t| { Ok(Value::dynamic( List::from(vec![Value::dynamic( MuseString::from(String::default()), - &vm, + vm, )]), vm, )) @@ -136,7 +136,7 @@ pub static STRING_TYPE: RustType = RustType::new("String", |t| { Ok(Value::dynamic( haystack .split(&*needle) - .map(|segment| Value::dynamic(MuseString::from(segment), &vm)) + .map(|segment| Value::dynamic(MuseString::from(segment), vm)) .collect::(), vm, )) @@ -146,7 +146,7 @@ pub static STRING_TYPE: RustType = RustType::new("String", |t| { Ok(Value::dynamic( needle .split(&haystack) - .map(|segment| Value::dynamic(MuseString::from(segment), &vm)) + .map(|segment| Value::dynamic(MuseString::from(segment), vm)) .collect::(), vm, )) @@ -171,7 +171,7 @@ pub static STRING_TYPE: RustType = RustType::new("String", |t| { let mut combined = String::with_capacity(lhs.len() + rhs.len()); combined.push_str(&lhs); combined.push_str(&rhs); - Ok(Value::dynamic(MuseString::from(combined), &vm)) + Ok(Value::dynamic(MuseString::from(combined), vm)) } } else { let rhs = rhs.to_string(vm)?.try_upgrade(vm.guard())?; diff --git a/muse-lang/src/runtime/symbol.rs b/muse-lang/src/runtime/symbol.rs index 1c6426e..488c582 100644 --- a/muse-lang/src/runtime/symbol.rs +++ b/muse-lang/src/runtime/symbol.rs @@ -409,6 +409,12 @@ impl Deref for StaticSymbol { } } +impl<'a> From<&'a StaticSymbol> for SymbolRef { + fn from(value: &'a StaticSymbol) -> Self { + value.downgrade() + } +} + /// A type that contains a list of symbols. pub trait SymbolList { /// The iterator used for [`into_symbols`](Self::into_symbols). diff --git a/muse-lang/src/runtime/value.rs b/muse-lang/src/runtime/value.rs index 7c8c8a5..a0bf6e4 100644 --- a/muse-lang/src/runtime/value.rs +++ b/muse-lang/src/runtime/value.rs @@ -867,7 +867,7 @@ impl Value { } /// Moves `value` into the virtual machine. - pub fn dynamic<'guard, T>(value: T, guard: impl AsRef>) -> Self + pub fn dynamic<'guard, T>(value: T, guard: &impl AsRef>) -> Self where T: DynamicValue + Trace, { @@ -1569,7 +1569,7 @@ pub struct AnyDynamic(pub(crate) AnyRef); impl AnyDynamic { /// Returns `value` as a garbage collected value that can be used in Muse. - pub fn new<'guard, T>(value: T, guard: impl AsRef>) -> Self + pub fn new<'guard, T>(value: T, guard: &impl AsRef>) -> Self where T: DynamicValue + Trace, { @@ -2009,7 +2009,7 @@ impl AnyDynamicRoot { AnyDynamic(self.0.as_any()) } /// Returns `value` as a garbage collected value that can be used in Muse. - pub fn new<'guard, T>(value: T, guard: impl AsRef>) -> Self + pub fn new<'guard, T>(value: T, guard: &impl AsRef>) -> Self where T: DynamicValue + Trace, { @@ -2068,7 +2068,7 @@ where /// Moves `value` into the garbage collector and returns a rooted reference /// to it. #[must_use] - pub fn new<'guard>(value: T, guard: impl AsRef>) -> Self { + pub fn new<'guard>(value: T, guard: &impl AsRef>) -> Self { Self(Root::new(Custom(value), guard)) } @@ -2151,7 +2151,7 @@ where /// Moves `value` into the garbage collector and returns a weak reference to /// it. #[must_use] - pub fn new<'guard>(value: T, guard: impl AsRef>) -> Self { + pub fn new<'guard>(value: T, guard: &impl AsRef>) -> Self { Self(Ref::new(Custom(value), guard)) } @@ -4399,7 +4399,7 @@ impl CustomType for AsyncFunction { vm.allocate(1)?; vm.current_frame_mut()[0] = Value::dynamic( ValueFuture(Arc::new(Mutex::new(Box::pin(future)))), - &vm, + vm.guard(), ); vm.jump_to(1); } @@ -4538,7 +4538,7 @@ impl RootedValue { } /// Moves `value` into the virtual machine. - pub fn dynamic<'guard, T>(value: T, guard: impl AsRef>) -> Self + pub fn dynamic<'guard, T>(value: T, guard: &impl AsRef>) -> Self where T: DynamicValue + Trace, { diff --git a/muse-lang/src/vm.rs b/muse-lang/src/vm.rs index 1625634..35626c5 100644 --- a/muse-lang/src/vm.rs +++ b/muse-lang/src/vm.rs @@ -1075,7 +1075,7 @@ impl<'context, 'guard> VmContext<'context, 'guard> { }; function.module = Some(ModuleId(0)); - self.declare_inner(name, Value::dynamic(function, &self), true, Access::Public) + self.declare_inner(name, Value::dynamic(function, self), true, Access::Public) } /// Resolves the value at `path`. @@ -1948,7 +1948,7 @@ impl VmContext<'_, '_> { function .to_function(self.guard) .in_module(self.frames[self.current_frame].module), - &self, + self, ) }) .ok_or(Fault::InvalidOpcode), @@ -1960,7 +1960,7 @@ impl VmContext<'_, '_> { .map(|ty| { Value::dynamic( ty.load(self.guard, self.frames[self.current_frame].module), - &self, + self, ) }) .ok_or(Fault::InvalidOpcode), @@ -1972,7 +1972,7 @@ impl VmContext<'_, '_> { .get(v) .ok_or(Fault::InvalidOpcode)?; let ty = ty.load(self)?; - Ok(Value::dynamic(ty, &self)) + Ok(Value::dynamic(ty, self)) } } } diff --git a/muse-reactor/Cargo.toml b/muse-reactor/Cargo.toml index 5f3f4af..3d461d1 100644 --- a/muse-reactor/Cargo.toml +++ b/muse-reactor/Cargo.toml @@ -20,3 +20,6 @@ tracing = { workspace = true, optional = true } [dev-dependencies] tracing-subscriber = { workspace = true } + +[lints] +workspace = true diff --git a/muse-reactor/src/lib.rs b/muse-reactor/src/lib.rs index 213cc28..a31040a 100644 --- a/muse-reactor/src/lib.rs +++ b/muse-reactor/src/lib.rs @@ -15,7 +15,7 @@ //! use muse_lang::runtime::value::{Primitive, RootedValue}; //! //! // Create a new reactor for tasks to run in. -//! let reactor = Reactor::new(); +//! let reactor = Reactor::spawn(); //! //! // Spawn a task that computes 1 + 2 //! let task = reactor.spawn_source("1 + 2").unwrap(); @@ -44,7 +44,7 @@ //! use std::time::Duration; //! //! // Create a new reactor for tasks to run in. -//! let reactor = Reactor::new(); +//! let reactor = Reactor::spawn(); //! //! // Create a budget pool that we can spawn tasks within. //! let pool = reactor.create_budget_pool(BudgetPoolConfig::default()).unwrap(); @@ -73,6 +73,7 @@ use std::fmt::{Debug, Write}; use std::future::Future; use std::marker::PhantomData; use std::num::NonZeroUsize; +use std::ops::ControlFlow; use std::panic::{self, AssertUnwindSafe, PanicInfo}; use std::pin::Pin; use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; @@ -108,9 +109,10 @@ mod mock_tracing; static PANIC_HOOK_INSTALL: OnceLock<()> = OnceLock::new(); thread_local! { - static PANIC_INFO: Cell)>> = Cell::new(None); + static PANIC_INFO: Cell)>> = const { Cell::new(None) }; } +#[must_use] pub struct Builder { vm_source: Option>>, threads: usize, @@ -119,6 +121,12 @@ pub struct Builder { _work: PhantomData, } +impl Default for Builder { + fn default() -> Self { + Self::new() + } +} + impl Builder { pub fn new() -> Self { Self { @@ -158,6 +166,7 @@ where self } + #[must_use] pub fn finish(self) -> ReactorHandle { PANIC_HOOK_INSTALL.get_or_init(|| { let default_hook = panic::take_hook(); @@ -210,7 +219,7 @@ where spawner: spawn_send.clone(), handle: thread::Builder::new() .name(thread_name.clone()) - .spawn(move || reactor.run(spawn_send, data, parker)) + .spawn(move || reactor.run(spawn_send, data, &parker)) .expect("error spawning thread"), }); } @@ -304,8 +313,7 @@ impl Dispatcher { while let Some(command) = if let Some(budget) = self .recharge_queue .iter() - .filter_map(|id| self.recharging_budgets.get(*id)) - .next() + .find_map(|id| self.recharging_budgets.get(*id)) { self.commands.recv_deadline(budget.recharge_at).ok() } else { @@ -471,7 +479,8 @@ pub struct Reactor { } impl Reactor { - pub fn new() -> ReactorHandle { + #[must_use] + pub fn spawn() -> ReactorHandle { Self::build().finish() } @@ -512,241 +521,153 @@ where mut self, sender: Sender>, data: Arc, - parker: Parker, + parker: &Parker, ) { let canceller = TaskCanceller { canceller: Arc::new(sender), unparker: parker.unparker().clone(), }; let mut tasks = ReactorTasks::new(data); - 'outer: while !self.handle.data.shared.shutdown.load(Ordering::Relaxed) { + while !self.handle.data.shared.shutdown.load(Ordering::Relaxed) { tasks.wake_woken(); self.wake_exhausted(&mut tasks); let mut guard = CollectionGuard::acquire(); - for _ in 0..tasks.executing.len() { - let task_id = tasks.executing[0]; - let task = &mut tasks.all[task_id]; - let mut vm_context = task.vm.context(&mut guard); - let mut future = vm_context.resume_for_async(Duration::from_micros(100)); - let pinned_future = Pin::new(&mut future); - - let mut context = Context::from_waker(&task.waker); - match panic::catch_unwind(AssertUnwindSafe(|| pinned_future.poll(&mut context))) { - Ok(Poll::Ready(Ok(result))) => { - drop(future); - let result = root_result(Ok(result), &mut vm_context); - drop(vm_context); - tasks.complete_running_task(result, &mut self.budgets); - } - Ok(Poll::Ready(Err(ExecutionError::Exception(err)))) => { - drop(future); - let result = root_result(Err(err), &mut vm_context); - drop(vm_context); - tasks.complete_running_task(result, &mut self.budgets); + self.execute_executing(&mut tasks, &mut guard, parker); + + if self + .process_commands(&canceller, &mut tasks, &mut guard) + .is_break() + { + break; + } + + drop(guard); + + if !tasks.has_work() { + parker.park(); + } + } + } + + fn process_command( + &mut self, + command: ThreadCommand, + canceller: &TaskCanceller, + tasks: &mut ReactorTasks, + guard: &mut CollectionGuard<'_>, + ) { + match command { + ThreadCommand::Spawn(command) => { + let spawn = match command.what { + Spawnable::Spawn(vm) => Ok(vm), + Spawnable::SpawnSource(source) => { + self.vm_source + .compile_and_prepare(&source, guard, &self.handle) } - Ok(Poll::Ready(Err(ExecutionError::Waiting)) | Poll::Pending) => { - task.executing = false; - tasks.executing.pop_front(); + Spawnable::SpawnCall(code, args) => { + self.vm_source.prepare_call(code, args, guard, &self.handle) } - Ok(Poll::Ready(Err(ExecutionError::Timeout))) => { - // Task is still executing, but took longer than its - // time slice. Keep it in queue for the next iteration - // of the loop. - tasks.executing.rotate_left(1); + Spawnable::SpawnWork(work) => { + work.initialize(self.vm_source.as_ref(), guard, &self.handle) } - Ok(Poll::Ready(Err(ExecutionError::NoBudget))) => { - if let Some(budget) = task - .budget_pool - .and_then(|pool_id| self.budgets.get_mut(&pool_id)) - { - let allocated = budget.allocate(); - if allocated > 0 { - // We gathered some budget, give it back to the - // task and keep it executing. - vm_context.increase_budget(allocated); - tasks.executing.rotate_left(1); + }; + match spawn { + Ok(vm) => { + let mut locked = command.result.0.locked.lock(); + if locked.cancelled { + return; + } + locked.cancellation = Some(canceller.clone()); + drop(locked); + + let budget = if let Some(pool) = command.pool { + if let Some(budget) = self.budgets.get_mut(&pool) { + vm.increase_budget(budget.allocate()); + Some(budget) } else { - let mut parked_threads = - budget.pool.0.potentially_parked_threads.lock(); - // In the time it took to lock the thread, we - // might have received budget. - let allocated = budget.allocate(); - if allocated > 0 { - vm_context.increase_budget(allocated); - tasks.executing.rotate_left(1); - } else { - task.executing = false; - parked_threads - .entry(self.id) - .or_insert_with(|| parker.unparker().clone()); - budget.exhausted.push_back(task_id); - tasks.executing.pop_front(); - } + command.result.send(Err(TaskError::Exception( + RootedValue::Symbol(Symbol::from("no_budget")), + ))); + return; } } else { - drop(future); - let result = root_result( - Err(Fault::NoBudget.as_exception(&mut vm_context)), - &mut vm_context, - ); - drop(vm_context); - tasks.complete_running_task(result, &mut self.budgets); - } + None + }; + tasks.push(command.id, command.pool, vm, command.result, budget); } - Err(mut panic) => { - drop(future); - let (mut summary, backtrace) = PANIC_INFO.take().unwrap_or_default(); - if let Some(backtrace) = backtrace { - let _result = write!(&mut summary, "\n{backtrace}"); - } - let result = root_result( - Err(Value::dynamic( - List::from_iter([ - Value::from(SymbolRef::from("panic")), - Value::from(SymbolRef::from(summary)), - ]), - vm_context.guard(), - )), - &mut vm_context, - ); - drop(vm_context); - tasks.complete_running_task(result, &mut self.budgets); - while let Err(new_panic) = - panic::catch_unwind(AssertUnwindSafe(move || drop(panic))) - { - panic = new_panic; - } + Err(err) => { + let err = match err { + PrepareError::Compilation(errors) => TaskError::Compilation(errors), + PrepareError::Execution(err) => TaskError::Exception( + err.as_value().upgrade(guard).expect("just allocated"), + ), + }; + command.result.send(Err(err)); } } } - - loop { - match self.receiver.try_recv() { - Ok(command) => match command { - ThreadCommand::Spawn(command) => { - let spawn = - match command.what { - Spawnable::Spawn(vm) => Ok(vm), - Spawnable::SpawnSource(source) => self - .vm_source - .compile_and_prepare(&source, &mut guard, &self.handle), - Spawnable::SpawnCall(code, args) => self - .vm_source - .prepare_call(code, args, &mut guard, &self.handle), - Spawnable::SpawnWork(work) => work.initialize( - self.vm_source.as_ref(), - &mut guard, - &self.handle, - ), - }; - match spawn { - Ok(vm) => { - let mut locked = command.result.0.locked.lock(); - if locked.cancelled { - continue; - } - locked.cancellation = Some(canceller.clone()); - drop(locked); - - let budget = if let Some(pool) = command.pool { - if let Some(budget) = self.budgets.get_mut(&pool) { - vm.increase_budget(budget.allocate()); - Some(budget) - } else { - let _ = command.result.send(Err(TaskError::Exception( - RootedValue::Symbol(Symbol::from("no_budget")), - ))); - continue; - } - } else { - None - }; - tasks.push( - command.id, - command.pool, - vm, - command.result, - budget, - ); - } - Err(err) => { - let err = match err { - PrepareError::Compilation(errors) => { - TaskError::Compilation(errors) - } - PrepareError::Execution(err) => TaskError::Exception( - err.as_value().upgrade(&guard).expect("just allocated"), - ), - }; - let _result = command.result.send(Err(err)); - } - } - } - ThreadCommand::NewBudgetPool(pool) => { - self.budgets.insert( - pool.0.id, - ThreadBudget { - pool, - exhausted_at: Cell::new(0), - exhausted: VecDeque::new(), - tasks: Set::new(), - }, - ); - } - ThreadCommand::DestroyBudgetPool(pool) => { - let Some(mut budget) = self.budgets.remove(&pool) else { - continue; - }; - for paused in std::mem::take(&mut budget.value.tasks).drain() { - tasks.complete_task( - paused, - Err(TaskError::Exception(RootedValue::Symbol(Symbol::from( - "no_budget", - )))), - &mut self.budgets, - ); - } - } - ThreadCommand::Cancel(global_id) => { - let Some(task_id) = tasks.registered.get(&global_id).copied() else { - continue; - }; - let task = tasks.complete_task( - task_id, - Err(TaskError::Cancelled), - &mut self.budgets, - ); - - if task.executing { - let (index, _) = tasks - .executing - .iter() - .enumerate() - .find(|(_, id)| task_id == **id) - .expect("task is executing"); - tasks.executing.remove(index); - } - } + ThreadCommand::NewBudgetPool(pool) => { + self.budgets.insert( + pool.0.id, + ThreadBudget { + pool, + exhausted_at: Cell::new(0), + exhausted: VecDeque::new(), + tasks: Set::new(), }, - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => break 'outer, + ); + } + ThreadCommand::DestroyBudgetPool(pool) => { + let Some(mut budget) = self.budgets.remove(&pool) else { + return; + }; + for paused in std::mem::take(&mut budget.value.tasks).drain() { + tasks.complete_task( + paused, + Err(TaskError::Exception(RootedValue::Symbol(Symbol::from( + "no_budget", + )))), + &mut self.budgets, + ); } } - - drop(guard); - - if tasks.executing.is_empty() { - let woken = tasks.woken.tasks.lock(); - if woken.is_empty() { - drop(woken); - parker.park(); + ThreadCommand::Cancel(global_id) => { + let Some(task_id) = tasks.registered.get(&global_id).copied() else { + return; + }; + let task = + tasks.complete_task(task_id, Err(TaskError::Cancelled), &mut self.budgets); + + if task.executing { + let (index, _) = tasks + .executing + .iter() + .enumerate() + .find(|(_, id)| task_id == **id) + .expect("task is executing"); + tasks.executing.remove(index); } } } } + fn process_commands( + &mut self, + canceller: &TaskCanceller, + tasks: &mut ReactorTasks, + guard: &mut CollectionGuard<'_>, + ) -> ControlFlow<()> { + loop { + match self.receiver.try_recv() { + Ok(command) => self.process_command(command, canceller, tasks, guard), + Err(TryRecvError::Empty) => return ControlFlow::Continue(()), + Err(TryRecvError::Disconnected) => return ControlFlow::Break(()), + } + } + } + fn wake_exhausted(&mut self, tasks: &mut ReactorTasks) { - for (_, budget) in self.budgets.iter_mut() { + for (_, budget) in &mut self.budgets { let last_updated = budget.pool.0.last_updated.load(Ordering::Relaxed); if budget.exhausted_at.get() != last_updated { budget.exhausted_at.set(last_updated); @@ -760,6 +681,110 @@ where } } } + + fn execute_executing( + &mut self, + tasks: &mut ReactorTasks, + guard: &mut CollectionGuard<'_>, + parker: &Parker, + ) { + for _ in 0..tasks.executing.len() { + let task_id = tasks.executing[0]; + let task = &mut tasks.all[task_id]; + let mut vm_context = task.vm.context(guard); + let mut future = vm_context.resume_for_async(Duration::from_micros(100)); + let pinned_future = Pin::new(&mut future); + + let mut context = Context::from_waker(&task.waker); + match panic::catch_unwind(AssertUnwindSafe(|| pinned_future.poll(&mut context))) { + Ok(Poll::Ready(Ok(result))) => { + drop(future); + let result = root_result(Ok(result), &mut vm_context); + drop(vm_context); + tasks.complete_running_task(result, &mut self.budgets); + } + Ok(Poll::Ready(Err(ExecutionError::Exception(err)))) => { + drop(future); + let result = root_result(Err(err), &mut vm_context); + drop(vm_context); + tasks.complete_running_task(result, &mut self.budgets); + } + Ok(Poll::Ready(Err(ExecutionError::Waiting)) | Poll::Pending) => { + task.executing = false; + tasks.executing.pop_front(); + } + Ok(Poll::Ready(Err(ExecutionError::Timeout))) => { + // Task is still executing, but took longer than its + // time slice. Keep it in queue for the next iteration + // of the loop. + tasks.executing.rotate_left(1); + } + Ok(Poll::Ready(Err(ExecutionError::NoBudget))) => { + if let Some(budget) = task + .budget_pool + .and_then(|pool_id| self.budgets.get_mut(&pool_id)) + { + let allocated = budget.allocate(); + if allocated > 0 { + // We gathered some budget, give it back to the + // task and keep it executing. + vm_context.increase_budget(allocated); + tasks.executing.rotate_left(1); + } else { + let mut parked_threads = + budget.pool.0.potentially_parked_threads.lock(); + // In the time it took to lock the thread, we + // might have received budget. + let allocated = budget.allocate(); + if allocated > 0 { + vm_context.increase_budget(allocated); + tasks.executing.rotate_left(1); + } else { + task.executing = false; + parked_threads + .entry(self.id) + .or_insert_with(|| parker.unparker().clone()); + budget.exhausted.push_back(task_id); + tasks.executing.pop_front(); + } + } + } else { + drop(future); + let result = root_result( + Err(Fault::NoBudget.as_exception(&mut vm_context)), + &mut vm_context, + ); + drop(vm_context); + tasks.complete_running_task(result, &mut self.budgets); + } + } + Err(mut panic) => { + drop(future); + let (mut summary, backtrace) = PANIC_INFO.take().unwrap_or_default(); + if let Some(backtrace) = backtrace { + let _result = write!(&mut summary, "\n{backtrace}"); + } + let result = root_result( + Err(Value::dynamic( + List::from_iter([ + Value::from(SymbolRef::from("panic")), + Value::from(SymbolRef::from(summary)), + ]), + vm_context.guard(), + )), + &mut vm_context, + ); + drop(vm_context); + tasks.complete_running_task(result, &mut self.budgets); + while let Err(new_panic) = + panic::catch_unwind(AssertUnwindSafe(move || drop(panic))) + { + panic = new_panic; + } + } + } + } + } } struct ThreadBudget { @@ -826,6 +851,7 @@ impl ResultHandle { self.0.sync.notify_all(); } + #[allow(clippy::needless_pass_by_value)] fn recv(&self, deadline: Deadline) -> Deadline::Result where Deadline: ResultDeadline, @@ -995,6 +1021,9 @@ impl ReactorTasks { id } + fn has_work(&self) -> bool { + !(self.executing.is_empty() && self.woken.tasks.lock().is_empty()) + } fn complete_running_task( &mut self, result: Result, @@ -1011,7 +1040,7 @@ impl ReactorTasks { budgets: &mut Map, ) -> ReactorTask { let task = self.all.remove(task_id).expect("task missing"); - let _result = task.result.send(result); + task.result.send(result); self.registered.remove(&task.global_id); if let Some(budget) = task.budget_pool.and_then(|p| budgets.get_mut(&p)) { budget.tasks.remove(&task_id); @@ -1030,7 +1059,7 @@ impl ReactorTasks { }; if !task.executing { task.executing = true; - self.executing.push_back(id) + self.executing.push_back(id); } } drop(woken); @@ -1074,14 +1103,17 @@ impl TaskHandle { self.result.recv(()) } + #[must_use] pub fn try_join(&self) -> Option> { self.result.try_recv() } + #[must_use] pub fn try_join_until(&self, deadline: Instant) -> Option> { self.result.recv(deadline) } + #[must_use] pub fn try_join_for(&self, duration: Duration) -> Option> { self.try_join_until(Instant::now() + duration) } @@ -1149,6 +1181,7 @@ impl ReactorHandle where Work: WorkUnit, { + #[must_use] pub fn runtime_module_in( &self, parent: Dynamic, @@ -1559,6 +1592,7 @@ impl BudgetPoolHandle where Work: WorkUnit, { + #[must_use] pub fn id(&self) -> BudgetPoolId { self.0.pool.0.id } @@ -1593,13 +1627,15 @@ where } pub fn increase_budget(&self, amount: usize) { - self.0.pool.increase_budget(amount) + self.0.pool.increase_budget(amount); } + #[must_use] pub fn remaining_budget(&self) -> usize { self.0.pool.0.budget.load(Ordering::Relaxed) } + #[must_use] pub fn reactor(&self) -> &ReactorHandle { &self.0.reactor } @@ -1631,6 +1667,7 @@ impl Drop for BudgetPoolHandleData { } #[non_exhaustive] +#[must_use] pub struct BudgetPoolConfig { pub maximum: usize, pub allocation_size: usize, @@ -1677,6 +1714,7 @@ impl BudgetPoolConfig { self } + #[must_use] pub fn recharges(&self) -> bool { self.recharge_every > Duration::ZERO && self.recharge_amount > 0 } diff --git a/muse-reactor/src/tests.rs b/muse-reactor/src/tests.rs index b3c585a..4f240bb 100644 --- a/muse-reactor/src/tests.rs +++ b/muse-reactor/src/tests.rs @@ -21,7 +21,7 @@ fn initialize_tracing() { #[test] fn works() { initialize_tracing(); - let reactor = Reactor::new(); + let reactor = Reactor::spawn(); let task = reactor.spawn_source("1 + 2").unwrap(); assert_eq!( task.join().unwrap(), @@ -32,7 +32,7 @@ fn works() { #[test] fn spawning() { initialize_tracing(); - let reactor = Reactor::new(); + let reactor = Reactor::spawn(); let task = reactor .spawn_source( r" @@ -124,7 +124,7 @@ fn automatic_recharge() { #[test] fn spawn_err() { initialize_tracing(); - let reactor = Reactor::new(); + let reactor = Reactor::spawn(); let task = reactor.spawn_source("invalid source code").unwrap(); match task.join() { Err(TaskError::Compilation(errors)) => assert_eq!( @@ -145,7 +145,7 @@ fn spawn_err() { #[test] fn task_cancellation() { initialize_tracing(); - let reactor = Reactor::new(); + let reactor = Reactor::spawn(); // Spawn a task with an infinite loop let task = reactor.spawn_source("loop {}").unwrap(); // Wait a bit to make sure it's running. @@ -165,7 +165,7 @@ fn task_cancellation() { #[test] fn pool_cancellation() { initialize_tracing(); - let reactor = Reactor::new(); + let reactor = Reactor::spawn(); // Spawn a task with an infinite loop let pool = reactor .create_budget_pool(BudgetPoolConfig::default()) @@ -192,10 +192,10 @@ fn task_panic() { let reactor = Reactor::build() .new_vm( |guard: &mut CollectionGuard<'_>, _reactor: &ReactorHandle| { - let vm = Vm::new(&guard); + let vm = Vm::new(guard); vm.declare( "panics", - Value::dynamic(RustFunction::new(|_vm, _arity| panic!()), &guard), + Value::dynamic(RustFunction::new(|_vm, _arity| panic!()), guard), guard, )?; Ok(vm) diff --git a/muse-ui/src/lib.rs b/muse-ui/src/lib.rs index 68a12b0..95c262f 100644 --- a/muse-ui/src/lib.rs +++ b/muse-ui/src/lib.rs @@ -28,7 +28,7 @@ pub fn install(vm: &Vm, guard: &mut CollectionGuard<'_>) { Err(Fault::IncorrectNumberOfArguments) } }), - &guard, + guard, ), guard, )