From 33639d69a6aa47bc67d5d9a72cf4e8c88ec0bac8 Mon Sep 17 00:00:00 2001 From: Lars Berger Date: Sat, 16 Nov 2024 19:51:51 +0800 Subject: [PATCH] feat: move provider emit listener to event listener loop --- packages/desktop/src/main.rs | 24 +++++- .../src/providers/disk/disk_provider.rs | 14 +-- .../providers/komorebi/komorebi_provider.rs | 9 +- .../src/providers/media/media_provider.rs | 21 +++-- .../src/providers/network/network_provider.rs | 10 +-- packages/desktop/src/providers/provider.rs | 13 +-- .../src/providers/provider_function.rs | 12 +++ .../desktop/src/providers/provider_manager.rs | 86 ++++++++++++------- .../desktop/src/providers/provider_ref.rs | 69 ++++----------- packages/desktop/src/sys_tray.rs | 4 +- 10 files changed, 139 insertions(+), 123 deletions(-) diff --git a/packages/desktop/src/main.rs b/packages/desktop/src/main.rs index c5f4be96..e7baee5a 100644 --- a/packages/desktop/src/main.rs +++ b/packages/desktop/src/main.rs @@ -173,8 +173,8 @@ async fn start_app(app: &mut tauri::App, cli: Cli) -> anyhow::Result<()> { app.handle().plugin(tauri_plugin_dialog::init())?; // Initialize `ProviderManager` in Tauri state. - let manager = Arc::new(ProviderManager::new(app.handle())); - app.manage(manager); + let (manager, emit_rx) = ProviderManager::new(app.handle()); + app.manage(manager.clone()); // Open widgets based on CLI command. open_widgets_by_cli_command(cli, widget_factory.clone()).await?; @@ -184,7 +184,15 @@ async fn start_app(app: &mut tauri::App, cli: Cli) -> anyhow::Result<()> { SysTray::new(app.handle(), config.clone(), widget_factory.clone()) .await?; - listen_events(app.handle(), config, monitor_state, widget_factory, tray); + listen_events( + app.handle(), + config, + monitor_state, + widget_factory, + tray, + manager, + emit_rx, + ); Ok(()) } @@ -194,7 +202,9 @@ fn listen_events( config: Arc, monitor_state: Arc, widget_factory: Arc, - tray: Arc, + tray: SysTray, + manager: Arc, + emit_rx: mpsc::Receiver, ) { let app_handle = app_handle.clone(); let mut widget_open_rx = widget_factory.open_tx.subscribe(); @@ -231,6 +241,12 @@ fn listen_events( info!("Widget configs changed."); widget_factory.relaunch_by_paths(&changed_configs.keys().cloned().collect()).await }, + Ok(provider_emit) = emit_rx.recv() => { + info!("Provider emission: {:?}", provider_emit); + app_handle.emit("provider-emit", provider_emit); + manager.update_cache(provider_emit).await; + Ok(()) + }, }; if let Err(err) = res { diff --git a/packages/desktop/src/providers/disk/disk_provider.rs b/packages/desktop/src/providers/disk/disk_provider.rs index 19f86153..320d6349 100644 --- a/packages/desktop/src/providers/disk/disk_provider.rs +++ b/packages/desktop/src/providers/disk/disk_provider.rs @@ -36,7 +36,7 @@ pub struct Disk { pub struct DiskProvider { config: DiskProviderConfig, - system: Arc>, + disks: Arc>, } #[derive(Debug, Clone, PartialEq, Serialize)] @@ -50,11 +50,11 @@ pub struct DiskSizeMeasure { } impl DiskProvider { - pub fn new( - config: DiskProviderConfig, - system: Arc>, - ) -> DiskProvider { - DiskProvider { config, system } + pub fn new(config: DiskProviderConfig) -> DiskProvider { + DiskProvider { + config, + disks: Arc::new(Mutex::new(Disks::new_with_refreshed_list())), + } } fn refresh_interval_ms(&self) -> u64 { @@ -62,7 +62,7 @@ impl DiskProvider { } async fn run_interval(&self) -> anyhow::Result { - let mut disks = self.system.lock().await; + let mut disks = self.disks.lock().await; disks.refresh(); let disks = disks diff --git a/packages/desktop/src/providers/komorebi/komorebi_provider.rs b/packages/desktop/src/providers/komorebi/komorebi_provider.rs index cd84a431..745131af 100644 --- a/packages/desktop/src/providers/komorebi/komorebi_provider.rs +++ b/packages/desktop/src/providers/komorebi/komorebi_provider.rs @@ -16,7 +16,7 @@ use super::{ KomorebiContainer, KomorebiLayout, KomorebiLayoutFlip, KomorebiMonitor, KomorebiWindow, KomorebiWorkspace, }; -use crate::providers::{Provider, ProviderOutput, ProviderResult}; +use crate::providers::{Provider, ProviderEmission, ProviderOutput}; const SOCKET_NAME: &str = "zebar.sock"; @@ -42,7 +42,7 @@ impl KomorebiProvider { fn create_socket( &self, - emit_result_tx: Sender, + emit_result_tx: mpsc::UnboundedSender, ) -> anyhow::Result<()> { let socket = komorebi_client::subscribe(SOCKET_NAME) .context("Failed to initialize Komorebi socket.")?; @@ -187,7 +187,10 @@ impl Provider for KomorebiProvider { RuntimeType::Sync } - fn start_sync(&mut self, emit_result_tx: Sender) { + fn start_sync( + &mut self, + emit_result_tx: mpsc::UnboundedSender, + ) { if let Err(err) = self.create_socket(emit_result_tx.clone()) { emit_result_tx.try_send(Err(err).into()); } diff --git a/packages/desktop/src/providers/media/media_provider.rs b/packages/desktop/src/providers/media/media_provider.rs index e3f80974..a1fb03bf 100644 --- a/packages/desktop/src/providers/media/media_provider.rs +++ b/packages/desktop/src/providers/media/media_provider.rs @@ -5,7 +5,10 @@ use std::{ use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use tokio::{sync::mpsc::Sender, task}; +use tokio::{ + sync::mpsc::{self, Sender}, + task, +}; use tracing::{debug, error}; use windows::{ Foundation::{EventRegistrationToken, TypedEventHandler}, @@ -16,7 +19,9 @@ use windows::{ }, }; -use crate::providers::{Provider, ProviderOutput, ProviderResult}; +use crate::providers::{ + Provider, ProviderEmission, ProviderOutput, RuntimeType, +}; #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase")] @@ -60,7 +65,7 @@ impl MediaProvider { fn emit_media_info( session: Option<&GsmtcSession>, - emit_result_tx: Sender, + emit_result_tx: mpsc::UnboundedSender, ) { let _ = match Self::media_output(session) { Ok(media_output) => emit_result_tx @@ -124,7 +129,7 @@ impl MediaProvider { } fn create_session_manager( - emit_result_tx: Sender, + emit_result_tx: mpsc::UnboundedSender, ) -> anyhow::Result<()> { debug!("Creating media session manager."); @@ -216,7 +221,7 @@ impl MediaProvider { fn add_session_listeners( session: &GsmtcSession, - emit_result_tx: Sender, + emit_result_tx: mpsc::UnboundedSender, ) -> windows::core::Result { debug!("Adding session listeners."); @@ -281,7 +286,11 @@ impl MediaProvider { #[async_trait] impl Provider for MediaProvider { - async fn run(&self, emit_result_tx: Sender) { + fn runtime_type(&self) -> RuntimeType { + RuntimeType::Async + } + + async fn start_async(&mut self) { task::spawn_blocking(move || { if let Err(err) = Self::create_session_manager(emit_result_tx.clone()) diff --git a/packages/desktop/src/providers/network/network_provider.rs b/packages/desktop/src/providers/network/network_provider.rs index 48c36176..29734ef5 100644 --- a/packages/desktop/src/providers/network/network_provider.rs +++ b/packages/desktop/src/providers/network/network_provider.rs @@ -36,11 +36,11 @@ pub struct NetworkProvider { } impl NetworkProvider { - pub fn new( - config: NetworkProviderConfig, - netinfo: Arc>, - ) -> NetworkProvider { - NetworkProvider { config, netinfo } + pub fn new(config: NetworkProviderConfig) -> NetworkProvider { + NetworkProvider { + config, + netinfo: Arc::new(Mutex::new(Networks::new_with_refreshed_list())), + } } fn refresh_interval_ms(&self) -> u64 { diff --git a/packages/desktop/src/providers/provider.rs b/packages/desktop/src/providers/provider.rs index b266ad93..526bd1c9 100644 --- a/packages/desktop/src/providers/provider.rs +++ b/packages/desktop/src/providers/provider.rs @@ -1,7 +1,6 @@ use async_trait::async_trait; -use tokio::sync::mpsc; -use super::ProviderResult; +use super::{ProviderFunction, ProviderFunctionResult}; #[async_trait] pub trait Provider: Send + Sync { @@ -99,13 +98,7 @@ macro_rules! impl_interval_provider { crate::providers::RuntimeType::Async } - async fn run_async( - &mut self, - emit_result_tx: tokio::sync::mpsc::Sender< - crate::providers::ProviderResult, - >, - _stop_rx: tokio::sync::mpsc::Receiver<()>, - ) { + async fn start_async(&mut self) { let mut interval = tokio::time::interval( std::time::Duration::from_millis(self.refresh_interval_ms()), ); @@ -116,7 +109,7 @@ macro_rules! impl_interval_provider { .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let mut last_interval_res: Option< - crate::providers::ProviderResult, + crate::providers::ProviderEmission, > = None; loop { diff --git a/packages/desktop/src/providers/provider_function.rs b/packages/desktop/src/providers/provider_function.rs index 0138f122..2c53d0ce 100644 --- a/packages/desktop/src/providers/provider_function.rs +++ b/packages/desktop/src/providers/provider_function.rs @@ -11,3 +11,15 @@ pub enum MediaFunction { Next, Previous, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ProviderFunctionResult { + Media(MediaFunctionResult), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum MediaFunctionResult { + PlayPause(bool), + Next(bool), + Previous(bool), +} diff --git a/packages/desktop/src/providers/provider_manager.rs b/packages/desktop/src/providers/provider_manager.rs index fbceeea8..9eb8ca85 100644 --- a/packages/desktop/src/providers/provider_manager.rs +++ b/packages/desktop/src/providers/provider_manager.rs @@ -1,12 +1,18 @@ use std::{collections::HashMap, sync::Arc}; use anyhow::Context; -use sysinfo::{Disks, Networks, System}; -use tauri::AppHandle; -use tokio::sync::Mutex; +use sysinfo::System; +use tauri::{AppHandle, Emitter}; +use tokio::{ + sync::{mpsc, Mutex}, + task, +}; use tracing::warn; -use super::{ProviderConfig, ProviderRef}; +use super::{ + ProviderConfig, ProviderEmission, ProviderFunction, + ProviderFunctionResult, ProviderRef, RuntimeType, +}; /// State shared between providers. #[derive(Clone)] @@ -18,18 +24,33 @@ pub struct SharedProviderState { pub struct ProviderManager { app_handle: AppHandle, provider_refs: Arc>>, + emit_cache: Arc>>, shared_state: SharedProviderState, + emit_tx: mpsc::UnboundedSender, } impl ProviderManager { - pub fn new(app_handle: &AppHandle) -> Self { - Self { - app_handle: app_handle.clone(), - provider_refs: Arc::new(Mutex::new(HashMap::new())), - shared_state: SharedProviderState { - sysinfo: Arc::new(Mutex::new(System::new_all())), - }, - } + /// Creates a new provider manager. + /// + /// Returns a tuple containing the manager and a channel for provider + /// emissions. + pub fn new( + app_handle: &AppHandle, + ) -> (Arc, mpsc::UnboundedReceiver) { + let (emit_tx, emit_rx) = mpsc::unbounded_channel::(); + + ( + Arc::new(Self { + app_handle: app_handle.clone(), + provider_refs: Arc::new(Mutex::new(HashMap::new())), + emit_cache: Arc::new(Mutex::new(HashMap::new())), + shared_state: SharedProviderState { + sysinfo: Arc::new(Mutex::new(System::new_all())), + }, + emit_tx, + }), + emit_rx, + ) } /// Creates a provider with the given config. @@ -38,22 +59,19 @@ impl ProviderManager { config_hash: String, config: ProviderConfig, ) -> anyhow::Result<()> { + // If a provider with the given config already exists, re-emit its + // latest emission and return early. { - let mut providers = self.provider_refs.lock().await; - - // If a provider with the given config already exists, refresh it - // and return early. - if let Some(found_provider) = providers.get_mut(&config_hash) { - if let Err(err) = found_provider.refresh().await { - warn!("Error refreshing provider: {:?}", err); - } - + if let Some(found_emit) = + self.emit_cache.lock().await.get(&config_hash) + { + self.app_handle.emit("provider-emit", found_emit); return Ok(()); }; } let provider_ref = ProviderRef::new( - &self.app_handle, + self.emit_tx.clone(), config, config_hash.clone(), self.shared_state.clone(), @@ -73,19 +91,17 @@ impl ProviderManager { function: ProviderFunction, ) -> anyhow::Result { let mut providers = self.provider_refs.lock().await; - let found_provider = providers - .get_mut(&config_hash) + let provider = providers + .get_mut(config_hash) .context("No provider found with config.")?; - match found_provider.runtime_type() { - RuntimeType::Async => { - found_provider.call_async_function(function).await + match provider.runtime_type() { + RuntimeType::Async => provider.call_async_function(function).await, + RuntimeType::Sync => { + task::spawn_blocking(move || provider.call_sync_function(function)) + .await + .map_err(|err| format!("Function execution failed: {}", err))? } - RuntimeType::Sync => task::spawn_blocking(move || { - found_provider.call_sync_function(function) - }) - .await - .map_err(|err| format!("Function execution failed: {}", err))?, } } @@ -103,4 +119,10 @@ impl ProviderManager { Ok(()) } + + /// Updates the cache with the given provider emission. + pub async fn update_cache(&self, emit: ProviderEmission) { + let mut cache = self.emit_cache.lock().await; + cache.insert(emit.config_hash, emit); + } } diff --git a/packages/desktop/src/providers/provider_ref.rs b/packages/desktop/src/providers/provider_ref.rs index 52b8d387..02d80d22 100644 --- a/packages/desktop/src/providers/provider_ref.rs +++ b/packages/desktop/src/providers/provider_ref.rs @@ -5,7 +5,10 @@ use serde::Serialize; use serde_json::json; use tauri::{AppHandle, Emitter}; use tokio::{ - sync::{mpsc, Mutex}, + sync::{ + mpsc::{self, UnboundedSender}, + Mutex, + }, task, }; use tracing::{info, warn}; @@ -25,11 +28,11 @@ use super::{ /// Reference to an active provider. pub struct ProviderRef { /// Cache for provider output. - cache: Arc>>>, + cache: Arc>>>, /// Sender channel for emitting provider output/error to frontend /// clients. - emit_result_tx: mpsc::Sender, + emit_result_tx: mpsc::UnboundedSender, /// Sender channel for stopping the provider. stop_tx: mpsc::Sender<()>, @@ -41,17 +44,17 @@ pub struct ProviderRef { /// in a nicer way. #[derive(Debug, Clone, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] -pub enum ProviderResult { +pub enum ProviderEmission { Output(ProviderOutput), Error(String), } /// Implements conversion from `anyhow::Result`. -impl From> for ProviderResult { +impl From> for ProviderEmission { fn from(result: anyhow::Result) -> Self { match result { - Ok(output) => ProviderResult::Output(output), - Err(err) => ProviderResult::Error(err.to_string()), + Ok(output) => ProviderEmission::Output(output), + Err(err) => ProviderEmission::Error(err.to_string()), } } } @@ -59,7 +62,7 @@ impl From> for ProviderResult { impl ProviderRef { /// Creates a new `ProviderRef` instance. pub async fn new( - app_handle: &AppHandle, + emit_result_tx: UnboundedSender, config: ProviderConfig, config_hash: String, shared_state: SharedProviderState, @@ -67,15 +70,6 @@ impl ProviderRef { let cache = Arc::new(Mutex::new(None)); let (stop_tx, stop_rx) = mpsc::channel::<()>(1); - let (emit_result_tx, emit_result_rx) = - mpsc::channel::(1); - - Self::start_output_listener( - app_handle.clone(), - config_hash.clone(), - cache.clone(), - emit_result_rx, - ); Self::start_provider( config, @@ -92,42 +86,12 @@ impl ProviderRef { }) } - fn start_output_listener( - app_handle: AppHandle, - config_hash: String, - cache: Arc>>>, - mut emit_result_rx: mpsc::Receiver, - ) { - task::spawn(async move { - while let Some(output) = emit_result_rx.recv().await { - info!("Emitting for provider: {}", config_hash); - - let output = Box::new(output); - let payload = json!({ - "configHash": config_hash.clone(), - "result": *output.clone(), - }); - - if let Err(err) = app_handle.emit("provider-emit", payload) { - warn!("Error emitting provider output: {:?}", err); - } - - // Update the provider's output cache. - if let Ok(mut providers) = cache.try_lock() { - *providers = Some(output); - } else { - warn!("Failed to update provider output cache."); - } - } - }); - } - /// Starts the provider in a separate task. fn start_provider( config: ProviderConfig, config_hash: String, shared_state: SharedProviderState, - emit_result_tx: mpsc::Sender, + emit_result_tx: mpsc::UnboundedSender, mut stop_rx: mpsc::Receiver<()>, ) -> anyhow::Result<()> { let mut provider = Self::create_provider(config, shared_state)?; @@ -170,13 +134,10 @@ impl ProviderRef { ProviderConfig::Memory(config) => { Box::new(MemoryProvider::new(config, shared_state.sysinfo.clone())) } - ProviderConfig::Disk(config) => { - Box::new(DiskProvider::new(config, shared_state.diskinfo.clone())) + ProviderConfig::Disk(config) => Box::new(DiskProvider::new(config)), + ProviderConfig::Network(config) => { + Box::new(NetworkProvider::new(config)) } - ProviderConfig::Network(config) => Box::new(NetworkProvider::new( - config, - shared_state.netinfo.clone(), - )), ProviderConfig::Weather(config) => { Box::new(WeatherProvider::new(config)) } diff --git a/packages/desktop/src/sys_tray.rs b/packages/desktop/src/sys_tray.rs index 8e2ec538..d83d580b 100644 --- a/packages/desktop/src/sys_tray.rs +++ b/packages/desktop/src/sys_tray.rs @@ -116,7 +116,7 @@ impl SysTray { app_handle: &AppHandle, config: Arc, widget_factory: Arc, - ) -> anyhow::Result> { + ) -> anyhow::Result { let mut sys_tray = Self { app_handle: app_handle.clone(), config, @@ -126,7 +126,7 @@ impl SysTray { sys_tray.tray_icon = Some(sys_tray.create_tray_icon().await?); - Ok(Arc::new(sys_tray)) + Ok(sys_tray) } async fn create_tray_icon(&self) -> anyhow::Result {