From 5ad17d6b6c31bc58992a6eb213988000962afb61 Mon Sep 17 00:00:00 2001 From: Greg Johnston Date: Thu, 1 Aug 2024 19:30:43 -0400 Subject: [PATCH] fix: correct thread-local behavior for Effects (closes #2754) --- reactive_graph/src/effect/effect.rs | 267 +++++++++++++++------------- reactive_graph/src/effect/inner.rs | 9 +- 2 files changed, 151 insertions(+), 125 deletions(-) diff --git a/reactive_graph/src/effect/effect.rs b/reactive_graph/src/effect/effect.rs index b90060475b..14737249fc 100644 --- a/reactive_graph/src/effect/effect.rs +++ b/reactive_graph/src/effect/effect.rs @@ -5,7 +5,7 @@ use crate::{ AnySubscriber, ReactiveNode, SourceSet, Subscriber, ToAnySubscriber, WithObserver, }, - owner::{LocalStorage, Owner, StoredValue}, + owner::{LocalStorage, Owner, Storage, StoredValue, SyncStorage}, traits::Dispose, }; use any_spawner::Executor; @@ -72,13 +72,17 @@ use std::{ /// this with a web framework, this generally means that effects **do not run on the server**. /// and you can call browser-specific APIs within the effect function without causing issues. /// If you need an effect to run on the server, use [`Effect::new_isomorphic`]. -pub struct Effect { - inner: StoredValue>>, LocalStorage>, +pub struct Effect { + inner: Option>, } -impl Dispose for Effect { +type StoredEffect = Option>>; + +impl Dispose for Effect { fn dispose(self) { - self.inner.dispose() + if let Some(inner) = self.inner { + inner.dispose() + } } } @@ -100,12 +104,22 @@ fn effect_base() -> (Receiver, Owner, Arc>) { (rx, owner, inner) } -impl Effect { +impl Effect +where + S: Storage, +{ /// Stops this effect before it is disposed. pub fn stop(self) { - drop(self.inner.try_update_value(|inner| inner.take())); + if let Some(inner) = self + .inner + .and_then(|this| this.try_update_value(|inner| inner.take())) + { + drop(inner); + } } +} +impl Effect { /// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values /// that are read inside it change. /// @@ -116,11 +130,11 @@ impl Effect { where T: 'static, { - let (mut rx, owner, inner) = effect_base(); - let value = Arc::new(RwLock::new(None::)); - let mut first_run = true; + let inner = cfg!(feature = "effects").then(|| { + let (mut rx, owner, inner) = effect_base(); + let value = Arc::new(RwLock::new(None::)); + let mut first_run = true; - if cfg!(feature = "effects") { Executor::spawn_local({ let value = Arc::clone(&value); let subscriber = inner.to_any_subscriber(); @@ -145,100 +159,11 @@ impl Effect { } } }); - } - Self { - inner: StoredValue::new_with_storage(Some(inner)), - } - } - - /// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values - /// that are read inside it change. - /// - /// This spawns a task that can be run on any thread. For an effect that will be spawned on - /// the current thread, use [`new`](Effect::new). - pub fn new_sync( - mut fun: impl FnMut(Option) -> T + Send + Sync + 'static, - ) -> Self - where - T: Send + Sync + 'static, - { - let (mut rx, owner, inner) = effect_base(); - let mut first_run = true; - let value = Arc::new(RwLock::new(None::)); - - if cfg!(feature = "effects") { - Executor::spawn({ - let value = Arc::clone(&value); - let subscriber = inner.to_any_subscriber(); - - async move { - while rx.next().await.is_some() { - if first_run - || subscriber.with_observer(|| { - subscriber.update_if_necessary() - }) - { - first_run = false; - subscriber.clear_sources(&subscriber); - - let old_value = - mem::take(&mut *value.write().or_poisoned()); - let new_value = owner.with_cleanup(|| { - subscriber.with_observer(|| fun(old_value)) - }); - *value.write().or_poisoned() = Some(new_value); - } - } - } - }); - } - - Self { - inner: StoredValue::new_with_storage(Some(inner)), - } - } - - /// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values - /// that are read inside it change. - /// - /// This will run whether the `effects` feature is enabled or not. - pub fn new_isomorphic( - mut fun: impl FnMut(Option) -> T + Send + Sync + 'static, - ) -> Self - where - T: Send + Sync + 'static, - { - let (mut rx, owner, inner) = effect_base(); - let mut first_run = true; - let value = Arc::new(RwLock::new(None::)); - - Executor::spawn({ - let value = Arc::clone(&value); - let subscriber = inner.to_any_subscriber(); - - async move { - while rx.next().await.is_some() { - if first_run - || subscriber - .with_observer(|| subscriber.update_if_necessary()) - { - first_run = false; - subscriber.clear_sources(&subscriber); - - let old_value = - mem::take(&mut *value.write().or_poisoned()); - let new_value = owner.with_cleanup(|| { - subscriber.with_observer(|| fun(old_value)) - }); - *value.write().or_poisoned() = Some(new_value); - } - } - } + StoredValue::new_with_storage(Some(inner)) }); - Self { - inner: StoredValue::new_with_storage(Some(inner)), - } + + Self { inner } } /// A version of [`Effect::new`] that only listens to any dependency @@ -340,12 +265,12 @@ impl Effect { D: 'static, T: 'static, { - let (mut rx, owner, inner) = effect_base(); - let mut first_run = true; - let dep_value = Arc::new(RwLock::new(None::)); - let watch_value = Arc::new(RwLock::new(None::)); + let inner = cfg!(feature = "effects").then(|| { + let (mut rx, owner, inner) = effect_base(); + let mut first_run = true; + let dep_value = Arc::new(RwLock::new(None::)); + let watch_value = Arc::new(RwLock::new(None::)); - if cfg!(feature = "effects") { Executor::spawn_local({ let dep_value = Arc::clone(&dep_value); let watch_value = Arc::clone(&watch_value); @@ -390,10 +315,102 @@ impl Effect { } } }); - } + + StoredValue::new_with_storage(Some(inner)) + }); + + Self { inner } + } +} + +impl Effect { + /// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values + /// that are read inside it change. + /// + /// This spawns a task that can be run on any thread. For an effect that will be spawned on + /// the current thread, use [`new`](Effect::new). + pub fn new_sync( + mut fun: impl FnMut(Option) -> T + Send + Sync + 'static, + ) -> Self + where + T: Send + Sync + 'static, + { + let inner = cfg!(feature = "effects").then(|| { + let (mut rx, owner, inner) = effect_base(); + let mut first_run = true; + let value = Arc::new(RwLock::new(None::)); + + Executor::spawn({ + let value = Arc::clone(&value); + let subscriber = inner.to_any_subscriber(); + + async move { + while rx.next().await.is_some() { + if first_run + || subscriber.with_observer(|| { + subscriber.update_if_necessary() + }) + { + first_run = false; + subscriber.clear_sources(&subscriber); + + let old_value = + mem::take(&mut *value.write().or_poisoned()); + let new_value = owner.with_cleanup(|| { + subscriber.with_observer(|| fun(old_value)) + }); + *value.write().or_poisoned() = Some(new_value); + } + } + } + }); + + StoredValue::new_with_storage(Some(inner)) + }); + + Self { inner } + } + + /// Creates a new effect, which runs once on the next “tick”, and then runs again when reactive values + /// that are read inside it change. + /// + /// This will run whether the `effects` feature is enabled or not. + pub fn new_isomorphic( + mut fun: impl FnMut(Option) -> T + Send + Sync + 'static, + ) -> Self + where + T: Send + Sync + 'static, + { + let (mut rx, owner, inner) = effect_base(); + let mut first_run = true; + let value = Arc::new(RwLock::new(None::)); + + Executor::spawn({ + let value = Arc::clone(&value); + let subscriber = inner.to_any_subscriber(); + + async move { + while rx.next().await.is_some() { + if first_run + || subscriber + .with_observer(|| subscriber.update_if_necessary()) + { + first_run = false; + subscriber.clear_sources(&subscriber); + + let old_value = + mem::take(&mut *value.write().or_poisoned()); + let new_value = owner.with_cleanup(|| { + subscriber.with_observer(|| fun(old_value)) + }); + *value.write().or_poisoned() = Some(new_value); + } + } + } + }); Self { - inner: StoredValue::new_with_storage(Some(inner)), + inner: Some(StoredValue::new_with_storage(Some(inner))), } } @@ -415,7 +432,7 @@ impl Effect { let dep_value = Arc::new(RwLock::new(None::)); let watch_value = Arc::new(RwLock::new(None::)); - if cfg!(feature = "effects") { + let inner = cfg!(feature = "effects").then(|| { Executor::spawn({ let dep_value = Arc::clone(&dep_value); let watch_value = Arc::clone(&watch_value); @@ -460,22 +477,28 @@ impl Effect { } } }); - } - Self { - inner: StoredValue::new_with_storage(Some(inner)), - } + StoredValue::new_with_storage(Some(inner)) + }); + + Self { inner } } } -impl ToAnySubscriber for Effect { +impl ToAnySubscriber for Effect +where + S: Storage, +{ fn to_any_subscriber(&self) -> AnySubscriber { self.inner - .try_with_value(|inner| { - inner.as_ref().map(|inner| inner.to_any_subscriber()) + .and_then(|inner| { + inner + .try_with_value(|inner| { + inner.as_ref().map(|inner| inner.to_any_subscriber()) + }) + .flatten() }) - .flatten() - .expect("tried to subscribe to effect that has been stopped") + .expect("tried to set effect that has been stopped") } } @@ -484,7 +507,9 @@ impl ToAnySubscriber for Effect { #[track_caller] #[deprecated = "This function is being removed to conform to Rust \ idioms.Please use `Effect::new()` instead."] -pub fn create_effect(fun: impl FnMut(Option) -> T + 'static) -> Effect +pub fn create_effect( + fun: impl FnMut(Option) -> T + 'static, +) -> Effect where T: 'static, { diff --git a/reactive_graph/src/effect/inner.rs b/reactive_graph/src/effect/inner.rs index 5c818f2a25..a781f9c4a0 100644 --- a/reactive_graph/src/effect/inner.rs +++ b/reactive_graph/src/effect/inner.rs @@ -8,11 +8,12 @@ use crate::{ use or_poisoned::OrPoisoned; use std::sync::{Arc, RwLock, Weak}; +/// Handles internal subscription logic for effects. #[derive(Debug)] -pub(crate) struct EffectInner { - pub dirty: bool, - pub observer: Sender, - pub sources: SourceSet, +pub struct EffectInner { + pub(crate) dirty: bool, + pub(crate) observer: Sender, + pub(crate) sources: SourceSet, } impl ToAnySubscriber for Arc> {