Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
lars-berger committed Nov 15, 2024
1 parent 1410fff commit 392f768
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl Provider for KomorebiProvider {
ThreadingType::Sync
}

fn run_sync(&self, emit_result_tx: Sender<ProviderResult>) {
fn run_sync(&mut self, emit_result_tx: Sender<ProviderResult>) {
if let Err(err) = self.create_socket(emit_result_tx.clone()) {
emit_result_tx.try_send(Err(err).into());
}
Expand Down
2 changes: 2 additions & 0 deletions packages/desktop/src/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ mod memory;
mod network;
mod provider;
mod provider_config;
mod provider_function;
mod provider_manager;
mod provider_output;
mod provider_ref;
mod weather;

pub use provider::*;
pub use provider_config::*;
pub use provider_function::*;
pub use provider_manager::*;
pub use provider_output::*;
pub use provider_ref::*;
21 changes: 15 additions & 6 deletions packages/desktop/src/providers/provider.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use async_trait::async_trait;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc;

use super::ProviderResult;

Expand All @@ -8,13 +8,21 @@ pub trait Provider: Send + Sync {
fn threading_type(&self) -> ThreadingType;

/// Callback for when the provider is started.
async fn run_async(&self, _emit_result_tx: Sender<ProviderResult>) {
// TODO: Change to not implemented exception.
async fn run_async(
&mut self,
_emit_result_tx: mpsc::Sender<ProviderResult>,
_stop_rx: mpsc::Receiver<()>,
) {
// TODO: mpsc::Change to not implemented exception.
todo!()
}

/// Callback for when the provider is started.
fn run_sync(&self, _emit_result_tx: Sender<ProviderResult>) {
fn run_sync(
&mut self,
_emit_result_tx: mpsc::Sender<ProviderResult>,
_stop_rx: mpsc::Receiver<()>,
) {
// TODO: Change to not implemented exception.
todo!()
}
Expand All @@ -41,14 +49,15 @@ macro_rules! impl_interval_provider {
#[async_trait::async_trait]
impl crate::providers::Provider for $type {
fn threading_type(&self) -> crate::providers::ThreadingType {
crate::providers::ThreadingType::Sync
crate::providers::ThreadingType::Async
}

async fn run_async(
&self,
&mut self,
emit_result_tx: tokio::sync::mpsc::Sender<
crate::providers::ProviderResult,
>,
_stop_rx: tokio::sync::mpsc::Receiver<()>,
) {
let mut interval = tokio::time::interval(
std::time::Duration::from_millis(self.refresh_interval_ms()),
Expand Down
13 changes: 13 additions & 0 deletions packages/desktop/src/providers/provider_function.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ProviderFunction {
Media(MediaFunction),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MediaFunction {
PlayPause,
Next,
Previous,
}
34 changes: 7 additions & 27 deletions packages/desktop/src/providers/provider_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,36 +127,16 @@ impl ProviderRef {
emit_result_tx: mpsc::Sender<ProviderResult>,
mut stop_rx: mpsc::Receiver<()>,
) -> anyhow::Result<()> {
let provider = Self::create_provider(config, shared_state)?;
let mut provider = Self::create_provider(config, shared_state)?;

let _ = match provider.threading_type() {
ThreadingType::Async => {
task::spawn(async move {
// TODO: Add arc `should_stop` to be passed to `run`.

let run = provider.run_async(emit_result_tx);
tokio::pin!(run);

// Ref: https://tokio.rs/tokio/tutorial/select#resuming-an-async-operation
loop {
tokio::select! {
// Default match arm which continuously runs the provider.
_ = run => break,

// On stop, perform any necessary clean up and exit the loop.
Some(_) = stop_rx.recv() => {
info!("Stopping provider: {}", config_hash);
_ = provider.on_stop().await;
break;
},
}
}

info!("Provider stopped: {}", config_hash);
})
}
ThreadingType::Async => task::spawn(async move {
provider.run_async(emit_result_tx, stop_rx).await;
info!("Provider stopped: {}", config_hash);
}),
ThreadingType::Sync => task::spawn_blocking(move || {
let run = provider.run_sync(emit_result_tx);
provider.run_sync(emit_result_tx, stop_rx);
info!("Provider stopped: {}", config_hash);
}),
};

Expand Down

0 comments on commit 392f768

Please sign in to comment.