diff --git a/Cargo.toml b/Cargo.toml index 9e901da9d..12090bc24 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ native-dialogs = ["dep:rfd"] kludgine = { git = "https://github.com/khonsulabs/kludgine", features = [ "app", ] } -figures = { version = "0.4.0" } +figures = { version = "0.4.2" } alot = "0.3.2" interner = "0.2.1" kempt = "0.2.1" diff --git a/examples/7guis-counter.rs b/examples/7guis-counter.rs new file mode 100644 index 000000000..8b32f77f2 --- /dev/null +++ b/examples/7guis-counter.rs @@ -0,0 +1,28 @@ +use cushy::value::Dynamic; +use cushy::widget::MakeWidget; +use cushy::widgets::label::Displayable; +use cushy::Run; +use figures::units::Lp; + +fn main() -> cushy::Result { + let count = Dynamic::new(0_usize); + + count + .to_label() + .expand() + .and( + "Count" + .into_button() + .on_click(move |_| { + *count.lock() += 1; + }) + .expand(), + ) + .into_columns() + .pad() + .width(Lp::inches(3)) + .into_window() + .titled("Counter") + .resize_to_fit(true) + .run() +} diff --git a/examples/7guis-temperature-converter.rs b/examples/7guis-temperature-converter.rs new file mode 100644 index 000000000..8eab5192f --- /dev/null +++ b/examples/7guis-temperature-converter.rs @@ -0,0 +1,30 @@ +use cushy::value::Dynamic; +use cushy::widget::MakeWidget; +use cushy::widgets::input::InputValue; +use cushy::Run; +use figures::units::Lp; + +fn main() -> cushy::Result { + let celsius = Dynamic::new(100f32); + let farenheit = celsius.linked( + |celsius| *celsius * 9. / 5. + 32., + |farenheit| (*farenheit - 32.) * 5. / 9., + ); + + let celsius_string = celsius.linked_string(); + let farenheight_string = farenheit.linked_string(); + + celsius_string + .into_input() + .expand() + .and("Celsius =") + .and(farenheight_string.into_input().expand()) + .and("Farenheit") + .into_columns() + .pad() + .width(Lp::inches(4)) + .into_window() + .titled("Temperature Converter") + .resize_to_fit(true) + .run() +} diff --git a/examples/7guis-timer.rs b/examples/7guis-timer.rs new file mode 100644 index 000000000..df5853920 --- /dev/null +++ b/examples/7guis-timer.rs @@ -0,0 +1,86 @@ +use std::time::{Duration, Instant}; + +use cushy::value::{Destination, Dynamic, DynamicReader, Source}; +use cushy::widget::MakeWidget; +use cushy::widgets::progress::Progressable; +use cushy::widgets::slider::Slidable; +use cushy::{Open, PendingApp}; +use figures::units::Lp; + +#[derive(PartialEq, Debug, Clone)] +struct Timer { + started_at: Instant, + duration: Duration, +} + +impl Default for Timer { + fn default() -> Self { + Self { + started_at: Instant::now() - Duration::from_secs(1), + duration: Duration::from_secs(1), + } + } +} + +fn main() -> cushy::Result { + let pending = PendingApp::default(); + let cushy = pending.cushy().clone(); + let _runtime = cushy.enter_runtime(); + + let timer = Dynamic::::default(); + let duration = timer.linked_accessor(|timer| &timer.duration, |timer| &mut timer.duration); + + let elapsed = spawn_timer(&timer); + let duration_label = duration.map_each(|duration| format!("{}s", duration.as_secs_f32())); + + elapsed + .progress_bar_between( + duration + .weak_clone() + .map_each_cloned(|duration| Duration::ZERO..=duration), + ) + .fit_horizontally() + .and(duration_label) + .and( + "Duration" + .and( + duration + .slider_between(Duration::ZERO, Duration::from_secs(30)) + .expand_horizontally(), + ) + .into_columns(), + ) + .and("Reset".into_button().on_click(move |_| { + timer.lock().started_at = Instant::now(); + })) + .into_rows() + .pad() + .width(Lp::inches(4)) + .into_window() + .titled("Timer") + .resize_to_fit(true) + .run_in(pending) +} + +fn spawn_timer(timer: &Dynamic) -> DynamicReader { + let timer = timer.create_reader(); + let elapsed = Dynamic::new(timer.map_ref(|timer| timer.duration)); + let elapsed_reader = elapsed.weak_clone().into_reader(); + std::thread::spawn(move || loop { + let settings = timer.get(); + + // Update the elapsed time, clamping to the duration of the timer. + let duration_since_started = settings.started_at.elapsed().min(settings.duration); + elapsed.set(duration_since_started); + + if duration_since_started < settings.duration { + // The timer is still running, "tick" the timer by sleeping and + // allow the loop to continue. + std::thread::sleep(Duration::from_millis(16)); + } else { + // Block the thread until the timer settings have been changed. + timer.block_until_updated(); + } + }); + elapsed_reader +} diff --git a/src/animation.rs b/src/animation.rs index 23c62060d..1f854ee9c 100644 --- a/src/animation.rs +++ b/src/animation.rs @@ -1053,6 +1053,27 @@ where } } +impl LinearInterpolate for Duration { + #[allow(clippy::cast_precision_loss)] + fn lerp(&self, target: &Self, percent: f32) -> Self { + let nanos = self.as_nanos().lerp(&target.as_nanos(), percent); + let seconds = nanos / 1_000_000_000; + let subsec_nanos = nanos % 1_000_000_000; + Self::new( + u64::try_from(seconds).unwrap_or(u64::MAX), + subsec_nanos as u32, + ) + } +} +impl PercentBetween for Duration { + fn percent_between(&self, min: &Self, max: &Self) -> ZeroToOne { + let this = self.as_secs_f32(); + let min = min.as_secs_f32(); + let max = max.as_secs_f32(); + this.percent_between(&min, &max) + } +} + #[test] fn integer_lerps() { #[track_caller] @@ -1156,14 +1177,16 @@ macro_rules! impl_percent_between { clippy::cast_lossless )] fn percent_between(&self, min: &Self, max: &Self) -> ZeroToOne { - assert!(min <= max, "percent_between requires min <= max"); - assert!( - self >= min && self <= max, - "self must satisfy min <= self <= max" - ); - - let range = max.$sub(*min); - ZeroToOne::from(self.$sub(*min) as $float / range as $float) + if min >= max { + return ZeroToOne::ZERO; + } else if self <= min { + ZeroToOne::ZERO + } else if self >= max { + ZeroToOne::ONE + } else { + let range = max.$sub(*min); + ZeroToOne::from(self.$sub(*min) as $float / range as $float) + } } } }; diff --git a/src/lib.rs b/src/lib.rs index 00e339d61..204f0195d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -328,7 +328,8 @@ fn initialize_tracing() { Targets::new() .with_target("winit", Level::ERROR) .with_target("wgpu", Level::ERROR) - .with_target("naga", Level::ERROR), + .with_target("naga", Level::ERROR) + .with_default(MAX_LEVEL), ) .try_init(); } diff --git a/src/value.rs b/src/value.rs index a150fe636..0134c0ad7 100644 --- a/src/value.rs +++ b/src/value.rs @@ -1,21 +1,23 @@ //! Types for storing and interacting with values in Widgets. -use std::cell::{Ref, RefCell, RefMut}; -use std::collections::HashMap; +use std::cell::{Cell, Ref, RefCell, RefMut}; +use std::collections::{HashMap, VecDeque}; use std::fmt::{self, Debug, Display}; use std::future::Future; use std::hash::{BuildHasher, Hash}; use std::ops::{Add, AddAssign, Deref, DerefMut, Not}; use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{self}; use std::sync::{Arc, Weak}; use std::task::{Poll, Waker}; -use std::thread::{self, ThreadId}; +use std::thread::ThreadId; use std::time::{Duration, Instant}; use ahash::{AHashMap, AHashSet}; use alot::{LotId, Lots}; use intentional::Assert; -use kempt::{Map, Sort}; +use kempt::{map, Map, Sort}; use parking_lot::{Condvar, Mutex, MutexGuard}; use crate::animation::{AnimationHandle, DynamicTransition, IntoAnimate, LinearInterpolate, Spawn}; @@ -27,6 +29,36 @@ use crate::widget::{ use crate::widgets::checkbox::CheckboxState; use crate::widgets::{Checkbox, Radio, Select, Space, Switcher}; use crate::window::WindowHandle; +use crate::Lazy; + +static CALLBACK_EXECUTORS: Mutex>> = Mutex::new(Map::new()); + +fn execute_callbacks( + lock: Arc, + callbacks: &mut CallbacksList, +) -> Result<(), DeadlockError> { + let mut executors = CALLBACK_EXECUTORS.lock(); + let key = Arc::as_ptr(&lock) as usize; + match executors.entry(key) { + map::Entry::Occupied(_) => return Err(DeadlockError), + map::Entry::Vacant(entry) => { + entry.insert(lock); + } + } + drop(executors); + + // Invoke all callbacks, removing those that report an + // error. + callbacks.invoked_at = Instant::now(); + callbacks + .callbacks + .drain_filter(|callback| callback.changed().is_err()); + + let mut executors = CALLBACK_EXECUTORS.lock(); + executors.remove(&key); + + Ok(()) +} /// A source of one or more `T` values. pub trait Source { @@ -132,6 +164,27 @@ pub trait Source { self.get() } + /// Executes `on_change` when the contents of this dynamic are updated. + /// + /// Returning `Err(CallbackDisconnected)` will prevent the callback from + /// being invoked again. + fn on_change_try(&self, on_change: F) -> CallbackHandle + where + T: Send + 'static, + F: FnMut() -> Result<(), CallbackDisconnected> + Send + 'static; + + /// Executes `on_change` when the contents of this dynamic are updated. + fn on_change(&self, mut on_change: F) -> CallbackHandle + where + T: Send + 'static, + F: FnMut() + Send + 'static, + { + self.on_change_try(move || { + on_change(); + Ok(()) + }) + } + /// Attaches `for_each` to this value so that it is invoked each time the /// source's contents are updated. /// @@ -372,7 +425,7 @@ pub trait Source { mapped.set_source( self.for_each_cloned_try(move |value| { let mapped = mapped_weak.upgrade().ok_or(CallbackDisconnected)?; - *mapped.lock() = value.clone(); + *mapped.lock() = value; Ok(()) }) .weak(), @@ -618,6 +671,14 @@ impl Source for Arc> { })) } + fn on_change_try(&self, on_change: F) -> CallbackHandle + where + T: Send + 'static, + F: FnMut() -> Result<(), CallbackDisconnected> + Send + 'static, + { + dynamic_for_each(self, on_change) + } + fn for_each_subsequent_generational_try(&self, mut for_each: F) -> CallbackHandle where T: Send + 'static, @@ -659,6 +720,14 @@ impl Source for Dynamic { self.0.try_map_generational(map) } + fn on_change_try(&self, on_change: F) -> CallbackHandle + where + T: Send + 'static, + F: FnMut() -> Result<(), CallbackDisconnected> + Send + 'static, + { + dynamic_for_each(&self.0, on_change) + } + fn for_each_subsequent_generational_try(&self, for_each: F) -> CallbackHandle where T: Send + 'static, @@ -689,6 +758,14 @@ impl Source for DynamicReader { }) } + fn on_change_try(&self, on_change: F) -> CallbackHandle + where + T: Send + 'static, + F: FnMut() -> Result<(), CallbackDisconnected> + Send + 'static, + { + dynamic_for_each(&self.source, on_change) + } + fn for_each_subsequent_generational_try(&self, for_each: F) -> CallbackHandle where T: Send + 'static, @@ -836,6 +913,24 @@ impl Source for Owned { })) } + fn on_change_try(&self, mut on_change: F) -> CallbackHandle + where + T: Send + 'static, + F: FnMut() -> Result<(), CallbackDisconnected> + Send + 'static, + { + let mut callbacks = self.callbacks.active.lock(); + CallbackHandle(CallbackHandleInner::Single(CallbackHandleData { + id: Some( + callbacks.push(Box::new(move |g: DynamicGuard<'_, T, true>| { + drop(g); + on_change() + })), + ), + owner: None, + callbacks: self.callbacks.clone(), + })) + } + fn for_each_subsequent_generational_try(&self, for_each: F) -> CallbackHandle where T: Send + 'static, @@ -979,7 +1074,7 @@ where pub fn invoke<'a, U>( &self, user: &'a mut U, - value: impl for<'b> Fn(&'b mut U) -> DynamicOrOwnedGuard<'b, T>, + value: impl for<'b> Fn(&'b mut U) -> DynamicOrOwnedGuard<'b, T, true>, ) { let mut callbacks = self.active.lock(); callbacks.drain_filter(|callback| { @@ -1025,10 +1120,11 @@ pub struct Dynamic(Arc>); impl Dynamic { /// Creates a new instance wrapping `value`. pub fn new(value: T) -> Self { + let state = State::new(value); + let lock = state.callbacks.lock.clone(); Self(Arc::new(DynamicData { - state: Mutex::new(State::new(value)), - during_callback_state: Mutex::default(), - sync: Condvar::default(), + state: Mutex::new(state), + lock, })) } @@ -1066,7 +1162,7 @@ impl Dynamic { /// thread. #[must_use] pub fn readers(&self) -> usize { - self.state().expect("deadlocked").readers + self.state::().expect("deadlocked").readers } /// Returns a new dynamic that has its contents linked with `self` by the @@ -1142,6 +1238,73 @@ impl Dynamic { self.linked(ToString::to_string, |s: &String| s.parse().ok()) } + /// Returns a dynamic that is synchronized with a borrowed value from + /// `self`. + /// + /// When the returned dynamic is updated, `self` will be updated using + /// `get_mut`. + pub fn linked_accessor(&self, get: Getter, get_mut: Setter) -> Dynamic + where + T: Send + 'static, + U: PartialEq + Clone + Send + 'static, + Getter: Fn(&T) -> &U + Send + Clone + 'static, + Setter: Fn(&mut T) -> &mut U + Send + 'static, + { + let ignore_changes = Arc::new(AtomicBool::new(false)); + + let linked = Dynamic::new(self.map_ref(|source| get(source).clone())); + let weak_linked = linked.downgrade(); + let weak_source = self.downgrade(); + + linked.set_source(self.for_each_generational_try({ + let ignore_changes = ignore_changes.clone(); + let get = get.clone(); + move |source| { + if ignore_changes.load(Ordering::Relaxed) { + return Ok(()); + } + + let linked = weak_linked.upgrade().ok_or(CallbackDisconnected)?; + let new_value = get(&*source).clone(); + drop(source); + + if let Ok(mut linked) = linked.try_lock() { + if *linked != new_value { + ignore_changes.store(true, Ordering::Relaxed); + *linked = new_value; + drop(linked); + ignore_changes.store(false, Ordering::Relaxed); + } + } + Ok(()) + } + })); + + linked + .for_each_generational_try(move |linked| { + if ignore_changes.load(Ordering::Relaxed) { + return Ok(()); + } + + let source = weak_source.upgrade().ok_or(CallbackDisconnected)?; + let new_value = linked.clone(); + drop(linked); + + if let Ok(mut source) = source.try_lock() { + if get(&*source) != &new_value { + ignore_changes.store(true, Ordering::Relaxed); + *get_mut(&mut source) = new_value; + drop(source); + ignore_changes.store(false, Ordering::Relaxed); + } + } + Ok(()) + }) + .persist(); + + linked + } + /// Sets the current `source` for this dynamic with `source`. /// /// A dynamic can have multiple source callbacks. @@ -1149,7 +1312,7 @@ impl Dynamic { /// This ensures that `source` stays active as long as any clones of `self` /// are alive. pub fn set_source(&self, source: CallbackHandle) { - self.state().assert("deadlocked").source_callback += source; + self.state::().assert("deadlocked").source_callback += source; } /// Attaches `for_each` to this value so that it is invoked each time the @@ -1201,10 +1364,11 @@ impl Dynamic { /// thread. #[must_use] pub fn create_reader(&self) -> DynamicReader { - self.state().expect("deadlocked").readers += 1; + let mut state = self.state::().expect("deadlocked"); + state.readers += 1; DynamicReader { source: self.0.clone(), - read_generation: Mutex::new(self.0.state().expect("deadlocked").wrapped.generation), + read_generation: Mutex::new(state.wrapped.generation), } } @@ -1250,29 +1414,18 @@ impl Dynamic { }) } - fn lock_inner(&self) -> DynamicGuard<'_, T, READONLY> { - let mut guard = self.0.state().expect("deadlocked"); - // Before allowing a lock, we need to ensure that the current change - // callbacks aren't executing. Otherwise, during drop of this guard, if - // we notify of changes from a second thread than one set is already - // occuring on, both sets of invocations can end up waiting on each - // other and deadlocking. By ensuring a single guard and change - // callbacks cycle can exist at any one time, we prevent this deadlock. - if !READONLY && guard.callbacks.currently_executing.lock().thread.is_some() { - let callbacks = guard.callbacks.clone(); - guard.unlocked(|| { - let current_thread_id = std::thread::current().id(); - let mut executing = callbacks.currently_executing.lock(); + fn try_lock_nonblocking( + &self, + ) -> Result, TryLockError> { + Ok(DynamicGuard { + guard: DynamicOrOwnedGuard::Dynamic(self.0.state_nonblocking()?), + accessed_mut: false, + prevent_notifications: false, + }) + } - loop { - match &executing.thread { - Some(th) if th == ¤t_thread_id => break, - None => break, - Some(_) => callbacks.sync.wait(&mut executing), - }; - } - }); - } + fn lock_inner(&self) -> DynamicGuard<'_, T, READONLY> { + let guard = self.0.state().expect("deadlocked"); DynamicGuard { guard: DynamicOrOwnedGuard::Dynamic(guard), accessed_mut: false, @@ -1280,7 +1433,9 @@ impl Dynamic { } } - fn state(&self) -> Result, DeadlockError> { + fn state( + &self, + ) -> Result, DeadlockError> { self.0.state() } @@ -1495,19 +1650,19 @@ impl Drop for Dynamic { // the only issue is that `on_disconnect` will not fire if during a map // callback on a `DynamicReader` the final reference to the source // `Dynamic`. - if let Ok(mut state) = self.state() { + if let Ok(mut state) = self.state::() { if Arc::strong_count(&self.0) == state.readers + 1 { let cleanup = state.cleanup(); drop(state); drop(cleanup); - self.0.sync.notify_all(); + self.0.lock.sync.notify_all(); } } else { // In the event that this is the rare edge case and a reader is // blocking, we want to signal that we've dropped the final // reference. - self.0.sync.notify_all(); + self.0.lock.sync.notify_all(); } } } @@ -1530,12 +1685,13 @@ impl From for Dynamic { } } -struct DynamicMutexGuard<'a, T> { +struct DynamicMutexGuard<'a, T, const READONLY: bool> { dynamic: &'a DynamicData, guard: MutexGuard<'a, State>, + released_hold: bool, } -impl Debug for DynamicMutexGuard<'_, T> +impl Debug for DynamicMutexGuard<'_, T, READONLY> where T: Debug, { @@ -1544,108 +1700,158 @@ where } } -impl<'a, T> DynamicMutexGuard<'a, T> { +impl<'a, T, const READONLY: bool> DynamicMutexGuard<'a, T, READONLY> { fn unlocked(&mut self, while_unlocked: impl FnOnce() -> R) -> R { - let previous_state = self.dynamic.during_callback_state.lock().take(); - let result = MutexGuard::unlocked(&mut self.guard, while_unlocked); + MutexGuard::unlocked(&mut self.guard, || { + let mut state = self.dynamic.lock.state.lock(); + let current_holder = state.lock_holder.take(); + // let current_executor = state.executing_callbacks.take(); + drop(state); + self.dynamic.lock.sync.notify_all(); + let result = while_unlocked(); + + let mut state = self.dynamic.lock.state.lock(); + state.lock_holder = current_holder; + result + }) + } - *self.dynamic.during_callback_state.lock() = previous_state; - result + fn release_hold(&mut self) { + self.released_hold = true; + self.dynamic.lock.state.lock().lock_holder = None; + self.dynamic.lock.sync.notify_all(); } } -impl<'a, T> Drop for DynamicMutexGuard<'a, T> { +impl<'a, T, const READONLY: bool> Drop for DynamicMutexGuard<'a, T, READONLY> { fn drop(&mut self) { - let mut during_state = self.dynamic.during_callback_state.lock(); - *during_state = None; - drop(during_state); - self.dynamic.sync.notify_all(); + if !self.released_hold { + self.release_hold(); + } } } -impl<'a, T> Deref for DynamicMutexGuard<'a, T> { +impl<'a, T, const READONLY: bool> Deref for DynamicMutexGuard<'a, T, READONLY> { type Target = State; fn deref(&self) -> &Self::Target { &self.guard } } -impl<'a, T> DerefMut for DynamicMutexGuard<'a, T> { +impl<'a, T, const READONLY: bool> DerefMut for DynamicMutexGuard<'a, T, READONLY> { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.guard } } -#[derive(Debug)] -struct LockState { - locked_thread: ThreadId, -} - struct DynamicData { state: Mutex>, - during_callback_state: Mutex>, - sync: Condvar, + lock: Arc, } impl DynamicData { - fn state(&self) -> Result, DeadlockError> { - let mut during_sync = self.during_callback_state.lock(); + fn state( + &self, + ) -> Result, DeadlockError> { + self.state_inner::<_, _, READONLY, true>(|mut already_locked| { + already_locked.block(); + Ok(already_locked) + }) + } + fn state_nonblocking<'a, const READONLY: bool>( + &'a self, + ) -> Result, TryLockError<'a>> { + self.state_inner::<_, _, READONLY, false>(|state: AlreadyLocked<'a>| { + Err(TryLockError::AlreadyLocked(state)) + }) + } + + fn state_inner<'a, E, F, const READONLY: bool, const BLOCKING: bool>( + &'a self, + mut when_locked: F, + ) -> Result, E> + where + E: std::fmt::Debug + From, + F: FnMut(AlreadyLocked<'a>) -> Result, E>, + { let current_thread_id = std::thread::current().id(); - let guard = loop { - match self.state.try_lock() { - Some(g) => break g, - None => loop { - match &*during_sync { - Some(state) if state.locked_thread == current_thread_id => { - return Err(DeadlockError) - } - Some(_) => { - self.sync.wait(&mut during_sync); - } - None => break, + let mut lock = self.lock.state.lock(); + loop { + match lock.lock_holder { + None => break, + Some(holder) if holder == current_thread_id => return Err(DeadlockError.into()), + Some(_) => { + let AlreadyLocked { state, .. } = when_locked(AlreadyLocked { + state: lock, + sync: &self.lock.sync, + })?; + lock = state; + } + } + } + + lock.lock_holder = Some(current_thread_id); + + let guard = if BLOCKING { + self.state.lock() + } else { + loop { + if let Some(guard) = self.state.try_lock() { + break guard; + } + + let AlreadyLocked { state, .. } = match when_locked(AlreadyLocked { + state: lock, + sync: &self.lock.sync, + }) { + Ok(locked) => locked, + Err(other) => { + self.lock.state.lock().lock_holder = None; + return Err(other); } - }, + }; + lock = state; } }; - *during_sync = Some(LockState { - locked_thread: current_thread_id, - }); + drop(lock); + Ok(DynamicMutexGuard { dynamic: self, guard, + released_hold: false, }) } pub fn redraw_when_changed(&self, window: WindowHandle) { - let mut state = self.state().expect("deadlocked"); + let mut state = self.state::().expect("deadlocked"); state.invalidation.windows.insert(window, true); } pub fn sync_when_changed(&self, window: WindowHandle) { - let mut state = self.state().expect("deadlocked"); + let mut state = self.state::().expect("deadlocked"); state.invalidation.windows.entry(window).or_insert(false); } pub fn invalidate_when_changed(&self, window: WindowHandle, widget: WidgetId) { - let mut state = self.state().expect("deadlocked"); + let mut state = self.state::().expect("deadlocked"); state.invalidation.widgets.insert((window, widget)); } pub fn map_mut(&self, map: impl FnOnce(Mutable) -> R) -> Result { - let mut state = self.state()?; + let mut state_guard = self.state::()?; let (old, callbacks) = { - let state = &mut *state; + let state = &mut *state_guard; let mut changed = false; let result = map(Mutable::new(&mut state.wrapped.value, &mut changed)); - let callbacks = changed.then(|| state.note_changed()); + let callbacks = changed.then(|| state_guard.note_changed()); (result, callbacks) }; - drop(state); - drop(callbacks); - - self.sync.notify_all(); + drop(state_guard); + if let Some(callbacks) = callbacks { + defer_execute_callbacks(callbacks); + } Ok(old) } @@ -1653,10 +1859,10 @@ impl DynamicData { fn dynamic_for_each(this: &Arc>, map: F) -> CallbackHandle where - F: for<'a> FnMut() -> Result<(), CallbackDisconnected> + Send + 'static, + F: FnMut() -> Result<(), CallbackDisconnected> + Send + 'static, T: Send + 'static, { - let state = this.state().expect("deadlocked"); + let state = this.state::().expect("deadlocked"); let mut data = state.callbacks.callbacks.lock(); CallbackHandle(CallbackHandleInner::Single(CallbackHandleData { id: Some(data.callbacks.push(Box::new(map))), @@ -1676,7 +1882,7 @@ where T: Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self.0.state() { + match self.0.state::() { Ok(state) => state.debug("Dynamic", f), Err(_) => f.debug_tuple("Dynamic").field(&"").finish(), } @@ -2025,26 +2231,30 @@ where } } -#[derive(Default)] -struct ChangeCallbacksExecutor { - thread: Option, +#[derive(Default, Debug)] +struct DynamicLockState { + lock_holder: Option, callbacks_to_remove: Vec, } +#[derive(Default, Debug)] +struct DynamicLockData { + state: Mutex, + sync: Condvar, +} + #[derive(Default)] struct ChangeCallbacksData { callbacks: Mutex, - currently_executing: Mutex, - sync: Condvar, + lock: Arc, } impl CallbackCollection for ChangeCallbacksData { fn remove(&self, id: LotId) { - let mut currently_executing = self.currently_executing.lock(); - if currently_executing.thread == Some(thread::current().id()) { - currently_executing.callbacks_to_remove.push(id); + if CallbackExecutor::is_current_thread() { + let mut state = self.lock.state.lock(); + state.callbacks_to_remove.push(id); } else { - drop(currently_executing); let mut data = self.callbacks.lock(); data.callbacks.remove(id); } @@ -2070,58 +2280,28 @@ struct ChangeCallbacks { changed_at: Instant, } -impl Drop for ChangeCallbacks { - fn drop(&mut self) { - let mut currently_executing = self.data.currently_executing.lock(); - let current_thread = thread::current().id(); - loop { - match ¤tly_executing.thread { - None => { - // No other thread is executing these callbacks. Set this - // thread as the current executor so that we can prevent - // infinite cycles. - currently_executing.thread = Some(current_thread); - drop(currently_executing); - - // Invoke the callbacks - let mut state = self.data.callbacks.lock(); - // If the callbacks have already been invoked by another - // thread such that the callbacks observed the value our - // thread wrote, we can skip the callbacks. - if state.invoked_at < self.changed_at { - state.invoked_at = Instant::now(); - // Invoke all callbacks, removing those that report an - // error. - state - .callbacks - .drain_filter(|callback| callback.changed().is_err()); - } - - // Remove ourselves as the current executor, notifying any - // other threads that are waiting. - currently_executing = self.data.currently_executing.lock(); - currently_executing.thread = None; - for callback in currently_executing.callbacks_to_remove.drain(..) { - state.callbacks.remove(callback); - } - drop(state); - drop(currently_executing); - self.data.sync.notify_all(); +impl ChangeCallbacks { + fn execute(self) { + // Invoke the callbacks + let mut data = self.data.callbacks.lock(); + // If the callbacks have already been invoked by another + // thread such that the callbacks observed the value our + // thread wrote, we can skip the callbacks. + if data.invoked_at < self.changed_at + && execute_callbacks(self.data.lock.clone(), &mut data).is_err() + { + return; + } - return; - } - Some(executing) if executing == ¤t_thread => { - // The callbacks are already running, and they triggered - // again. We ignore this rather than trying to continue to - // propagate because this can only be caused by a cycle - // happening during a callback already executing. - return; - } - Some(_) => { - self.data.sync.wait(&mut currently_executing); - } - } + // Clean up all callbacks that were disconnected while our callbacks + // were locked. + let mut state = self.data.lock.state.lock(); + for callback in state.callbacks_to_remove.drain(..) { + data.callbacks.remove(callback); } + drop(data); + drop(state); + self.data.lock.sync.notify_all(); } } @@ -2193,12 +2373,12 @@ impl DerefMut for GenerationalValue { } #[derive(Debug)] -enum DynamicOrOwnedGuard<'a, T> { - Dynamic(DynamicMutexGuard<'a, T>), +enum DynamicOrOwnedGuard<'a, T, const READONLY: bool> { + Dynamic(DynamicMutexGuard<'a, T, READONLY>), Owned(RefMut<'a, GenerationalValue>), OwnedRef(&'a mut GenerationalValue), } -impl<'a, T> DynamicOrOwnedGuard<'a, T> { +impl<'a, T, const READONLY: bool> DynamicOrOwnedGuard<'a, T, READONLY> { fn note_changed(&mut self) -> Option { match self { Self::Dynamic(guard) => Some(guard.note_changed()), @@ -2214,7 +2394,7 @@ impl<'a, T> DynamicOrOwnedGuard<'a, T> { } } -impl<'a, T> Deref for DynamicOrOwnedGuard<'a, T> { +impl<'a, T, const READONLY: bool> Deref for DynamicOrOwnedGuard<'a, T, READONLY> { type Target = GenerationalValue; fn deref(&self) -> &Self::Target { @@ -2226,7 +2406,7 @@ impl<'a, T> Deref for DynamicOrOwnedGuard<'a, T> { } } -impl<'a, T> DerefMut for DynamicOrOwnedGuard<'a, T> { +impl<'a, T, const READONLY: bool> DerefMut for DynamicOrOwnedGuard<'a, T, READONLY> { fn deref_mut(&mut self) -> &mut Self::Target { match self { Self::Dynamic(guard) => &mut guard.wrapped, @@ -2242,7 +2422,7 @@ impl<'a, T> DerefMut for DynamicOrOwnedGuard<'a, T> { /// notified of a change when this guard is dropped. #[derive(Debug)] pub struct DynamicGuard<'a, T, const READONLY: bool = false> { - guard: DynamicOrOwnedGuard<'a, T>, + guard: DynamicOrOwnedGuard<'a, T, READONLY>, accessed_mut: bool, prevent_notifications: bool, } @@ -2291,7 +2471,9 @@ impl Drop for DynamicGuard<'_, T, READONLY> { fn drop(&mut self) { if self.accessed_mut && !self.prevent_notifications { let callbacks = self.guard.note_changed(); - self.guard.unlocked(|| drop(callbacks)); + if let Some(callbacks) = callbacks { + defer_execute_callbacks(callbacks); + } } } } @@ -2389,6 +2571,14 @@ impl DynamicReader { } } + fn try_lock_nonblocking(&self) -> Result, TryLockError> { + Ok(DynamicGuard { + guard: DynamicOrOwnedGuard::Dynamic(self.source.state_nonblocking()?), + accessed_mut: false, + prevent_notifications: false, + }) + } + /// Returns the current generation that has been accessed through this /// reader. #[must_use] @@ -2405,7 +2595,12 @@ impl DynamicReader { /// thread. #[must_use] pub fn has_updated(&self) -> bool { - self.source.state().expect("deadlocked").wrapped.generation != self.read_generation() + self.source + .state::() + .expect("deadlocked") + .wrapped + .generation + != self.read_generation() } /// Blocks the current thread until the contained value has been updated or @@ -2418,13 +2613,9 @@ impl DynamicReader { /// This function panics if this value is already locked by the current /// thread. pub fn block_until_updated(&self) -> bool { - assert!( - self.source - .during_callback_state - .lock() - .as_ref() - .map_or(true, |state| state.locked_thread - != std::thread::current().id()), + assert_ne!( + self.source.lock.state.lock().lock_holder, + Some(std::thread::current().id()), "deadlocked" ); let mut state = self.source.state.lock(); @@ -2438,7 +2629,7 @@ impl DynamicReader { } // Wait for a notification of a change, which is synch - self.source.sync.wait(&mut state); + self.source.lock.sync.wait(&mut state); } } @@ -2470,7 +2661,7 @@ impl DynamicReader { where OnDisconnect: FnOnce() + Send + 'static, { - let mut state = self.source.state().expect("deadlocked"); + let mut state = self.source.state::().expect("deadlocked"); if let Some(callbacks) = &mut state.on_disconnect { callbacks.push(OnceCallback::new(|()| on_disconnect())); @@ -2506,7 +2697,7 @@ where impl Clone for DynamicReader { fn clone(&self) -> Self { - self.source.state().expect("deadlocked").readers += 1; + self.source.state::().expect("deadlocked").readers += 1; Self { source: self.source.clone(), read_generation: Mutex::new(self.read_generation()), @@ -2516,7 +2707,7 @@ impl Clone for DynamicReader { impl Drop for DynamicReader { fn drop(&mut self) { - let mut state = self.source.state().expect("deadlocked"); + let mut state = self.source.state::().expect("deadlocked"); state.readers -= 1; } } @@ -2533,7 +2724,7 @@ impl<'a, T> Future for BlockUntilUpdatedFuture<'a, T> { type Output = bool; fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - let mut state = self.0.source.state().expect("deadlocked"); + let mut state = self.0.source.state::().expect("deadlocked"); if state.wrapped.generation != self.0.read_generation() { return Poll::Ready(true); } else if state.readers == Arc::strong_count(&self.0.source) @@ -3230,34 +3421,103 @@ macro_rules! impl_tuple_for_each { // The list of tuple fields excluding the one being invoked. [$($rtype:ident $rfield:tt $rvar:ident),+] ) => { - $handles += $var.for_each_subsequent((&$for_each, $(&$rvar,)+).with_clone(|(for_each, $($rvar,)+)| { - move |$var: &$type| { - $(let $rvar = $rvar.read();)+ - let mut for_each = - for_each.lock(); - (for_each)(($(&$avar,)+)); + $handles += $var.on_change_try({ + let for_each = $for_each.clone(); + $(let $avar = $avar.clone();)+ + move || { + loop { + let result = 'locks: { + $(let $avar = match $avar.read_nonblocking() { + Ok(guard) => guard, + Err($crate::value::TryLockError::WouldDeadlock) => panic!("Deadlocked"), + Err($crate::value::TryLockError::AlreadyLocked(locked)) => { + break 'locks Err(locked); + } + };)+ + + Ok(($($avar,)+)) + }; + match result { + Ok(($($avar,)+)) => { + let mut for_each = for_each.lock(); + (for_each)(($(&$avar,)+)); + return Ok(()) + } + Err(mut already_locked) => { + already_locked.block(); + } + } + } } - })); + }); }; } +/// A lock was unable to be acquired. +#[derive(Debug)] +pub enum TryLockError<'guard> { + /// Attempting to acquire this lock would have resulted in a deadlock. + WouldDeadlock, + /// The lock is currently acquired. + /// + /// The returned structure can be used to block the current thread until + /// locking can be retried. + AlreadyLocked(AlreadyLocked<'guard>), +} + +impl From for TryLockError<'_> { + fn from(_value: DeadlockError) -> Self { + Self::WouldDeadlock + } +} + +/// A lock could not be aquired without blocking. +#[derive(Debug)] +pub struct AlreadyLocked<'guard> { + state: MutexGuard<'guard, DynamicLockState>, + sync: &'guard Condvar, +} + +impl AlreadyLocked<'_> { + /// Blocks the current thread until the lock state has changed. + pub fn block(&mut self) { + self.sync.wait(&mut self.state); + } +} + /// Read access to a value stored in a [`Dynamic`]. pub trait DynamicRead { /// Returns a guard that provides exclusive, read-only access to the value /// contained wihtin this dynamic. fn read(&self) -> DynamicGuard<'_, T, true>; + + /// Attempts to obtain a guard that provides exclusive, read-only access to + /// the value contained wihtin this dynamic. + /// + /// # Errors + /// + /// Returns an error if blocking would be required to lock this dynamic. + fn read_nonblocking(&self) -> Result, TryLockError>; } impl DynamicRead for Dynamic { fn read(&self) -> DynamicGuard<'_, T, true> { self.lock_inner() } + + fn read_nonblocking(&self) -> Result, TryLockError> { + self.try_lock_nonblocking() + } } impl DynamicRead for DynamicReader { fn read(&self) -> DynamicGuard<'_, T, true> { self.lock() } + + fn read_nonblocking(&self) -> Result, TryLockError> { + self.try_lock_nonblocking() + } } impl_all_tuples!(impl_tuple_for_each, 2); @@ -3994,6 +4254,13 @@ impl Source for Watcher { self.0.try_map_generational(map) } + fn on_change_try(&self, on_change: F) -> CallbackHandle + where + F: FnMut() -> Result<(), CallbackDisconnected> + Send + 'static, + { + self.0.on_change_try(on_change) + } + fn for_each_subsequent_generational_try(&self, for_each: F) -> CallbackHandle where F: for<'a> FnMut(DynamicGuard<'_, usize, true>) -> Result<(), CallbackDisconnected> @@ -4279,26 +4546,81 @@ where } } -#[test] -fn map_cycle_is_finite() { - crate::initialize_tracing(); - let a = Dynamic::new(0_usize); +fn defer_execute_callbacks(callbacks: ChangeCallbacks) { + static THREAD_SENDER: Lazy> = Lazy::new(|| { + let (sender, receiver) = mpsc::sync_channel(256); + std::thread::spawn(move || CallbackExecutor::new(receiver).run()); + sender + }); + let _ = THREAD_SENDER.send(callbacks); +} - // This callback updates a each time a is updated with a + 1, causing an - // infinite cycle if not broken by Cushy. - a.for_each_cloned({ - let a = a.clone(); - move |current| { - a.set(current + 1); +struct CallbackExecutor { + receiver: mpsc::Receiver, + enqueued: Map<*const ChangeCallbacksData, LotId>, + callbacks: Lots, + queue: VecDeque, +} + +impl CallbackExecutor { + fn new(receiver: mpsc::Receiver) -> Self { + Self { + receiver, + enqueued: Map::new(), + callbacks: Lots::new(), + queue: VecDeque::new(), } - }) - .persist(); + } - // Cushy will invoke the callback for the first set call, but the set call - // within the callback will not cause the callback to be invoked again. - // Thus, we expect setting the value to 1 to result in `a` containing 2. - a.set(1); - assert_eq!(a.get(), 2); + fn run(mut self) { + IS_EXECUTOR_THREAD.set(true); + + // Because this is stored in a static, this likely will never return an + // error, but if it does, it's during program shutdown, and we can exit safely. + while let Ok(callbacks) = self.receiver.recv() { + self.enqueue(callbacks); + loop { + // Exhaust any pending callbacks without blocking. + while let Ok(callbacks) = self.receiver.try_recv() { + self.enqueue(callbacks); + } + + let Some(callbacks) = self.pop_callbacks() else { + break; + }; + callbacks.execute(); + } + } + } + + fn enqueue(&mut self, callbacks: ChangeCallbacks) { + match self.enqueued.entry(Arc::as_ptr(&callbacks.data)) { + map::Entry::Occupied(id) => { + self.callbacks[*id].changed_at = + self.callbacks[*id].changed_at.max(callbacks.changed_at); + } + map::Entry::Vacant(entry) => { + let id = self.callbacks.push(callbacks); + entry.insert(id); + self.queue.push_back(id); + } + } + } + + fn pop_callbacks(&mut self) -> Option { + let id = self.queue.pop_front()?; + let callbacks = self.callbacks.remove(id)?; + self.enqueued.remove(&Arc::as_ptr(&callbacks.data)); + Some(callbacks) + } + + fn is_current_thread() -> bool { + IS_EXECUTOR_THREAD.get() + } +} + +thread_local! { + static IS_EXECUTOR_THREAD: Cell = const { Cell::new(false) }; } #[test] @@ -4342,12 +4664,17 @@ fn ref_counts() { #[test] fn linked_short_circuit() { let usize = Dynamic::new(0_usize); + let usize_reader = usize.create_reader(); let string = usize.linked_string(); string.map_ref(|s| assert_eq!(s, "0")); string.set(String::from("1")); + usize_reader.block_until_updated(); assert_eq!(usize.get(), 1); + + let string_reader = string.create_reader(); usize.set(2); + string_reader.block_until_updated(); string.map_ref(|s| assert_eq!(s, "2")); } @@ -4355,17 +4682,22 @@ fn linked_short_circuit() { fn graph_shortcircuit() { let a = Dynamic::new(0_usize); let doubled = a.map_each_cloned(|a| a * 2); + let doubled_reader = doubled.create_reader(); let quadrupled = doubled.map_each_cloned(|a| a * 2); + let quadrupled_reader = quadrupled.create_reader(); a.set_source(quadrupled.for_each_cloned({ let a = a.clone(); move |quad| a.set(quad / 4) })); assert_eq!(a.get(), 0); - assert_eq!(quadrupled.get(), 0); + assert_eq!(quadrupled_reader.get(), 0); a.set(1); - assert_eq!(quadrupled.get(), 4); + quadrupled_reader.block_until_updated(); + assert_eq!(doubled_reader.get(), 2); + assert_eq!(quadrupled_reader.get(), 4); quadrupled.set(16); + doubled_reader.block_until_updated(); assert_eq!(a.get(), 4); - assert_eq!(doubled.get(), 8); + assert_eq!(doubled_reader.get(), 8); } diff --git a/src/widget.rs b/src/widget.rs index 8828cdbf4..24550123f 100644 --- a/src/widget.rs +++ b/src/widget.rs @@ -2484,7 +2484,7 @@ where /// Allows to convert collections or iterators directly into [`Stack`], [`Layers`], etc. /// /// ``` -/// use cushy::widget::{MakeWidget, MakeWidgetList}; +/// use cushy::widget::{IntoWidgetList, MakeWidget}; /// /// vec!["hello", "label"].into_rows(); /// diff --git a/src/widgets/progress.rs b/src/widgets/progress.rs index e0672113f..08df8797b 100644 --- a/src/widgets/progress.rs +++ b/src/widgets/progress.rs @@ -14,7 +14,10 @@ use crate::animation::{ }; use crate::styles::components::{EasingIn, EasingOut}; use crate::styles::ContextFreeComponent; -use crate::value::{Destination, Dynamic, IntoReadOnly, IntoReader, MapEach, ReadOnly, Source}; +use crate::value::{ + Destination, Dynamic, DynamicRead, IntoReadOnly, IntoReader, MapEach, ReadOnly, Source, + TryLockError, Watcher, +}; use crate::widget::{MakeWidget, MakeWidgetWithTag, Widget, WidgetInstance}; use crate::widgets::slider::{InactiveTrackColor, Slidable, TrackColor, TrackSize}; use crate::widgets::Data; @@ -255,9 +258,26 @@ where ReadOnly::Constant(range) => value .map_each(move |value| value.to_progress(Some(range.start()..=range.end()))) .into_reader(), - ReadOnly::Reader(range) => (&range, &value) - .map_each(|(range, value)| value.to_progress(Some(range.start()..=range.end()))) - .into_reader(), + ReadOnly::Reader(range) => { + let watcher = Watcher::default(); + watcher.watch(&value); + watcher.watch(&range); + watcher + .map_changed(move || loop { + let value = value.read(); + let range = match range.read_nonblocking() { + Ok(range) => range, + Err(TryLockError::WouldDeadlock) => unreachable!("deadlock"), + Err(TryLockError::AlreadyLocked(mut locked)) => { + drop(value); + locked.block(); + continue; + } + }; + break value.to_progress(Some(range.start()..=range.end())); + }) + .into_reader() + } }) } }