Skip to content

Commit

Permalink
feat: move provider emit listener to event listener loop
Browse files Browse the repository at this point in the history
  • Loading branch information
lars-berger committed Nov 16, 2024
1 parent 0821c1b commit 33639d6
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 123 deletions.
24 changes: 20 additions & 4 deletions packages/desktop/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ async fn start_app(app: &mut tauri::App, cli: Cli) -> anyhow::Result<()> {
app.handle().plugin(tauri_plugin_dialog::init())?;

// Initialize `ProviderManager` in Tauri state.
let manager = Arc::new(ProviderManager::new(app.handle()));
app.manage(manager);
let (manager, emit_rx) = ProviderManager::new(app.handle());
app.manage(manager.clone());

// Open widgets based on CLI command.
open_widgets_by_cli_command(cli, widget_factory.clone()).await?;
Expand All @@ -184,7 +184,15 @@ async fn start_app(app: &mut tauri::App, cli: Cli) -> anyhow::Result<()> {
SysTray::new(app.handle(), config.clone(), widget_factory.clone())
.await?;

listen_events(app.handle(), config, monitor_state, widget_factory, tray);
listen_events(
app.handle(),
config,
monitor_state,
widget_factory,
tray,
manager,
emit_rx,
);

Ok(())
}
Expand All @@ -194,7 +202,9 @@ fn listen_events(
config: Arc<Config>,
monitor_state: Arc<MonitorState>,
widget_factory: Arc<WidgetFactory>,
tray: Arc<SysTray>,
tray: SysTray,
manager: Arc<ProviderManager>,
emit_rx: mpsc::Receiver<ProviderResult>,
) {
let app_handle = app_handle.clone();
let mut widget_open_rx = widget_factory.open_tx.subscribe();
Expand Down Expand Up @@ -231,6 +241,12 @@ fn listen_events(
info!("Widget configs changed.");
widget_factory.relaunch_by_paths(&changed_configs.keys().cloned().collect()).await
},
Ok(provider_emit) = emit_rx.recv() => {
info!("Provider emission: {:?}", provider_emit);
app_handle.emit("provider-emit", provider_emit);
manager.update_cache(provider_emit).await;
Ok(())
},
};

if let Err(err) = res {
Expand Down
14 changes: 7 additions & 7 deletions packages/desktop/src/providers/disk/disk_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct Disk {

pub struct DiskProvider {
config: DiskProviderConfig,
system: Arc<Mutex<Disks>>,
disks: Arc<Mutex<Disks>>,
}

#[derive(Debug, Clone, PartialEq, Serialize)]
Expand All @@ -50,19 +50,19 @@ pub struct DiskSizeMeasure {
}

impl DiskProvider {
pub fn new(
config: DiskProviderConfig,
system: Arc<Mutex<Disks>>,
) -> DiskProvider {
DiskProvider { config, system }
pub fn new(config: DiskProviderConfig) -> DiskProvider {
DiskProvider {
config,
disks: Arc::new(Mutex::new(Disks::new_with_refreshed_list())),
}
}

fn refresh_interval_ms(&self) -> u64 {
self.config.refresh_interval
}

async fn run_interval(&self) -> anyhow::Result<ProviderOutput> {
let mut disks = self.system.lock().await;
let mut disks = self.disks.lock().await;
disks.refresh();

let disks = disks
Expand Down
9 changes: 6 additions & 3 deletions packages/desktop/src/providers/komorebi/komorebi_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::{
KomorebiContainer, KomorebiLayout, KomorebiLayoutFlip, KomorebiMonitor,
KomorebiWindow, KomorebiWorkspace,
};
use crate::providers::{Provider, ProviderOutput, ProviderResult};
use crate::providers::{Provider, ProviderEmission, ProviderOutput};

const SOCKET_NAME: &str = "zebar.sock";

Expand All @@ -42,7 +42,7 @@ impl KomorebiProvider {

fn create_socket(
&self,
emit_result_tx: Sender<ProviderResult>,
emit_result_tx: mpsc::UnboundedSender<ProviderEmission>,
) -> anyhow::Result<()> {
let socket = komorebi_client::subscribe(SOCKET_NAME)
.context("Failed to initialize Komorebi socket.")?;
Expand Down Expand Up @@ -187,7 +187,10 @@ impl Provider for KomorebiProvider {
RuntimeType::Sync
}

fn start_sync(&mut self, emit_result_tx: Sender<ProviderResult>) {
fn start_sync(
&mut self,
emit_result_tx: mpsc::UnboundedSender<ProviderEmission>,
) {
if let Err(err) = self.create_socket(emit_result_tx.clone()) {
emit_result_tx.try_send(Err(err).into());
}
Expand Down
21 changes: 15 additions & 6 deletions packages/desktop/src/providers/media/media_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use std::{

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::{sync::mpsc::Sender, task};
use tokio::{
sync::mpsc::{self, Sender},
task,
};
use tracing::{debug, error};
use windows::{
Foundation::{EventRegistrationToken, TypedEventHandler},
Expand All @@ -16,7 +19,9 @@ use windows::{
},
};

use crate::providers::{Provider, ProviderOutput, ProviderResult};
use crate::providers::{
Provider, ProviderEmission, ProviderOutput, RuntimeType,
};

#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -60,7 +65,7 @@ impl MediaProvider {

fn emit_media_info(
session: Option<&GsmtcSession>,
emit_result_tx: Sender<ProviderResult>,
emit_result_tx: mpsc::UnboundedSender<ProviderEmission>,
) {
let _ = match Self::media_output(session) {
Ok(media_output) => emit_result_tx
Expand Down Expand Up @@ -124,7 +129,7 @@ impl MediaProvider {
}

fn create_session_manager(
emit_result_tx: Sender<ProviderResult>,
emit_result_tx: mpsc::UnboundedSender<ProviderEmission>,
) -> anyhow::Result<()> {
debug!("Creating media session manager.");

Expand Down Expand Up @@ -216,7 +221,7 @@ impl MediaProvider {

fn add_session_listeners(
session: &GsmtcSession,
emit_result_tx: Sender<ProviderResult>,
emit_result_tx: mpsc::UnboundedSender<ProviderEmission>,
) -> windows::core::Result<EventTokens> {
debug!("Adding session listeners.");

Expand Down Expand Up @@ -281,7 +286,11 @@ impl MediaProvider {

#[async_trait]
impl Provider for MediaProvider {
async fn run(&self, emit_result_tx: Sender<ProviderResult>) {
fn runtime_type(&self) -> RuntimeType {
RuntimeType::Async
}

async fn start_async(&mut self) {
task::spawn_blocking(move || {
if let Err(err) =
Self::create_session_manager(emit_result_tx.clone())
Expand Down
10 changes: 5 additions & 5 deletions packages/desktop/src/providers/network/network_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ pub struct NetworkProvider {
}

impl NetworkProvider {
pub fn new(
config: NetworkProviderConfig,
netinfo: Arc<Mutex<Networks>>,
) -> NetworkProvider {
NetworkProvider { config, netinfo }
pub fn new(config: NetworkProviderConfig) -> NetworkProvider {
NetworkProvider {
config,
netinfo: Arc::new(Mutex::new(Networks::new_with_refreshed_list())),
}
}

fn refresh_interval_ms(&self) -> u64 {
Expand Down
13 changes: 3 additions & 10 deletions packages/desktop/src/providers/provider.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use async_trait::async_trait;
use tokio::sync::mpsc;

use super::ProviderResult;
use super::{ProviderFunction, ProviderFunctionResult};

#[async_trait]
pub trait Provider: Send + Sync {
Expand Down Expand Up @@ -99,13 +98,7 @@ macro_rules! impl_interval_provider {
crate::providers::RuntimeType::Async
}

async fn run_async(
&mut self,
emit_result_tx: tokio::sync::mpsc::Sender<
crate::providers::ProviderResult,
>,
_stop_rx: tokio::sync::mpsc::Receiver<()>,
) {
async fn start_async(&mut self) {
let mut interval = tokio::time::interval(
std::time::Duration::from_millis(self.refresh_interval_ms()),
);
Expand All @@ -116,7 +109,7 @@ macro_rules! impl_interval_provider {
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let mut last_interval_res: Option<
crate::providers::ProviderResult,
crate::providers::ProviderEmission,
> = None;

loop {
Expand Down
12 changes: 12 additions & 0 deletions packages/desktop/src/providers/provider_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,15 @@ pub enum MediaFunction {
Next,
Previous,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ProviderFunctionResult {
Media(MediaFunctionResult),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MediaFunctionResult {
PlayPause(bool),
Next(bool),
Previous(bool),
}
86 changes: 54 additions & 32 deletions packages/desktop/src/providers/provider_manager.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use std::{collections::HashMap, sync::Arc};

use anyhow::Context;
use sysinfo::{Disks, Networks, System};
use tauri::AppHandle;
use tokio::sync::Mutex;
use sysinfo::System;
use tauri::{AppHandle, Emitter};
use tokio::{
sync::{mpsc, Mutex},
task,
};
use tracing::warn;

use super::{ProviderConfig, ProviderRef};
use super::{
ProviderConfig, ProviderEmission, ProviderFunction,
ProviderFunctionResult, ProviderRef, RuntimeType,
};

/// State shared between providers.
#[derive(Clone)]
Expand All @@ -18,18 +24,33 @@ pub struct SharedProviderState {
pub struct ProviderManager {
app_handle: AppHandle,
provider_refs: Arc<Mutex<HashMap<String, ProviderRef>>>,
emit_cache: Arc<Mutex<HashMap<String, ProviderEmission>>>,
shared_state: SharedProviderState,
emit_tx: mpsc::UnboundedSender<ProviderEmission>,
}

impl ProviderManager {
pub fn new(app_handle: &AppHandle) -> Self {
Self {
app_handle: app_handle.clone(),
provider_refs: Arc::new(Mutex::new(HashMap::new())),
shared_state: SharedProviderState {
sysinfo: Arc::new(Mutex::new(System::new_all())),
},
}
/// Creates a new provider manager.
///
/// Returns a tuple containing the manager and a channel for provider
/// emissions.
pub fn new(
app_handle: &AppHandle,
) -> (Arc<Self>, mpsc::UnboundedReceiver<ProviderEmission>) {
let (emit_tx, emit_rx) = mpsc::unbounded_channel::<ProviderEmission>();

(
Arc::new(Self {
app_handle: app_handle.clone(),
provider_refs: Arc::new(Mutex::new(HashMap::new())),
emit_cache: Arc::new(Mutex::new(HashMap::new())),
shared_state: SharedProviderState {
sysinfo: Arc::new(Mutex::new(System::new_all())),
},
emit_tx,
}),
emit_rx,
)
}

/// Creates a provider with the given config.
Expand All @@ -38,22 +59,19 @@ impl ProviderManager {
config_hash: String,
config: ProviderConfig,
) -> anyhow::Result<()> {
// If a provider with the given config already exists, re-emit its
// latest emission and return early.
{
let mut providers = self.provider_refs.lock().await;

// If a provider with the given config already exists, refresh it
// and return early.
if let Some(found_provider) = providers.get_mut(&config_hash) {
if let Err(err) = found_provider.refresh().await {
warn!("Error refreshing provider: {:?}", err);
}

if let Some(found_emit) =
self.emit_cache.lock().await.get(&config_hash)
{
self.app_handle.emit("provider-emit", found_emit);
return Ok(());
};
}

let provider_ref = ProviderRef::new(
&self.app_handle,
self.emit_tx.clone(),
config,
config_hash.clone(),
self.shared_state.clone(),
Expand All @@ -73,19 +91,17 @@ impl ProviderManager {
function: ProviderFunction,
) -> anyhow::Result<ProviderFunctionResult> {
let mut providers = self.provider_refs.lock().await;
let found_provider = providers
.get_mut(&config_hash)
let provider = providers
.get_mut(config_hash)
.context("No provider found with config.")?;

match found_provider.runtime_type() {
RuntimeType::Async => {
found_provider.call_async_function(function).await
match provider.runtime_type() {
RuntimeType::Async => provider.call_async_function(function).await,
RuntimeType::Sync => {
task::spawn_blocking(move || provider.call_sync_function(function))
.await
.map_err(|err| format!("Function execution failed: {}", err))?
}
RuntimeType::Sync => task::spawn_blocking(move || {
found_provider.call_sync_function(function)
})
.await
.map_err(|err| format!("Function execution failed: {}", err))?,
}
}

Expand All @@ -103,4 +119,10 @@ impl ProviderManager {

Ok(())
}

/// Updates the cache with the given provider emission.
pub async fn update_cache(&self, emit: ProviderEmission) {
let mut cache = self.emit_cache.lock().await;
cache.insert(emit.config_hash, emit);
}
}
Loading

0 comments on commit 33639d6

Please sign in to comment.