Skip to content

Commit

Permalink
feat: setup for provider functions (#156)
Browse files Browse the repository at this point in the history
  • Loading branch information
lars-berger authored Nov 20, 2024
1 parent 2c91b70 commit 315608a
Show file tree
Hide file tree
Showing 25 changed files with 1,047 additions and 610 deletions.
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/desktop/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ tauri-build = { version = "2.0", features = [] }
anyhow = "1"
async-trait = "0.1"
clap = { version = "4", features = ["derive"] }
crossbeam = "0.8"
reqwest = { version = "0.11", features = ["json"] }
tauri = { version = "2.0", features = [
"devtools",
Expand Down
19 changes: 17 additions & 2 deletions packages/desktop/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use crate::common::macos::WindowExtMacOs;
use crate::common::windows::WindowExtWindows;
use crate::{
config::{Config, WidgetConfig, WidgetPlacement},
providers::{ProviderConfig, ProviderManager},
providers::{
ProviderConfig, ProviderFunction, ProviderFunctionResponse,
ProviderManager,
},
widget_factory::{WidgetFactory, WidgetOpenOptions, WidgetState},
};

Expand Down Expand Up @@ -98,7 +101,19 @@ pub async fn unlisten_provider(
provider_manager: State<'_, Arc<ProviderManager>>,
) -> anyhow::Result<(), String> {
provider_manager
.destroy(config_hash)
.stop(config_hash)
.await
.map_err(|err| err.to_string())
}

#[tauri::command]
pub async fn call_provider_function(
config_hash: String,
function: ProviderFunction,
provider_manager: State<'_, Arc<ProviderManager>>,
) -> anyhow::Result<ProviderFunctionResponse, String> {
provider_manager
.call_function(config_hash, function)
.await
.map_err(|err| err.to_string())
}
Expand Down
67 changes: 67 additions & 0 deletions packages/desktop/src/common/interval.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::time::{Duration, Instant};

/// An interval timer for synchronous contexts using crossbeam.
///
/// For use with crossbeam's `select!` macro.
pub struct SyncInterval {
interval: Duration,
next_tick: Instant,
is_first: bool,
}

impl SyncInterval {
pub fn new(interval_ms: u64) -> Self {
Self {
interval: Duration::from_millis(interval_ms),
next_tick: Instant::now(),
is_first: true,
}
}

/// Returns a receiver that will get a message at the next tick time.
pub fn tick(&mut self) -> crossbeam::channel::Receiver<Instant> {
if self.is_first {
// Emit immediately on the first tick.
self.is_first = false;
crossbeam::channel::after(Duration::from_secs(0))
} else if let Some(wait_duration) =
self.next_tick.checked_duration_since(Instant::now())
{
// Wait normally until the next tick.
let timer = crossbeam::channel::after(wait_duration);
self.next_tick += self.interval;
timer
} else {
// We're behind - skip missed ticks to catch up.
while self.next_tick <= Instant::now() {
self.next_tick += self.interval;
}

crossbeam::channel::after(self.next_tick - Instant::now())
}
}
}

/// An interval timer for asynchronous contexts using tokio.
pub struct AsyncInterval {
interval: tokio::time::Interval,
}

impl AsyncInterval {
pub fn new(interval_ms: u64) -> Self {
let mut interval =
tokio::time::interval(Duration::from_millis(interval_ms));

// Skip missed ticks when the interval runs. This prevents a burst
// of backlogged ticks after a delay.
interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

Self { interval }
}

/// Returns a future that will complete at the next tick time.
pub async fn tick(&mut self) {
self.interval.tick().await;
}
}
2 changes: 2 additions & 0 deletions packages/desktop/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod format_bytes;
mod fs_util;
mod interval;
mod length_value;
#[cfg(target_os = "macos")]
pub mod macos;
Expand All @@ -9,5 +10,6 @@ pub mod windows;

pub use format_bytes::*;
pub use fs_util::*;
pub use interval::*;
pub use length_value::*;
pub use path_ext::*;
28 changes: 23 additions & 5 deletions packages/desktop/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use std::{env, sync::Arc};
use clap::Parser;
use cli::MonitorType;
use config::{MonitorSelection, WidgetPlacement};
use providers::ProviderEmission;
use tauri::{
async_runtime::block_on, AppHandle, Emitter, Manager, RunEvent,
};
use tokio::task;
use tokio::{sync::mpsc, task};
use tracing::{error, info, level_filters::LevelFilter};
use tracing_subscriber::EnvFilter;
use widget_factory::WidgetOpenOptions;
Expand Down Expand Up @@ -88,6 +89,7 @@ async fn main() -> anyhow::Result<()> {
commands::update_widget_config,
commands::listen_provider,
commands::unlisten_provider,
commands::call_provider_function,
commands::set_always_on_top,
commands::set_skip_taskbar
])
Expand Down Expand Up @@ -173,8 +175,8 @@ async fn start_app(app: &mut tauri::App, cli: Cli) -> anyhow::Result<()> {
app.handle().plugin(tauri_plugin_dialog::init())?;

// Initialize `ProviderManager` in Tauri state.
let manager = Arc::new(ProviderManager::new(app.handle()));
app.manage(manager);
let (manager, emit_rx) = ProviderManager::new(app.handle());
app.manage(manager.clone());

// Open widgets based on CLI command.
open_widgets_by_cli_command(cli, widget_factory.clone()).await?;
Expand All @@ -184,7 +186,15 @@ async fn start_app(app: &mut tauri::App, cli: Cli) -> anyhow::Result<()> {
SysTray::new(app.handle(), config.clone(), widget_factory.clone())
.await?;

listen_events(app.handle(), config, monitor_state, widget_factory, tray);
listen_events(
app.handle(),
config,
monitor_state,
widget_factory,
tray,
manager,
emit_rx,
);

Ok(())
}
Expand All @@ -194,7 +204,9 @@ fn listen_events(
config: Arc<Config>,
monitor_state: Arc<MonitorState>,
widget_factory: Arc<WidgetFactory>,
tray: Arc<SysTray>,
tray: SysTray,
manager: Arc<ProviderManager>,
mut emit_rx: mpsc::UnboundedReceiver<ProviderEmission>,
) {
let app_handle = app_handle.clone();
let mut widget_open_rx = widget_factory.open_tx.subscribe();
Expand Down Expand Up @@ -231,6 +243,12 @@ fn listen_events(
info!("Widget configs changed.");
widget_factory.relaunch_by_paths(&changed_configs.keys().cloned().collect()).await
},
Some(provider_emission) = emit_rx.recv() => {
info!("Provider emission: {:?}", provider_emission);
app_handle.emit("provider-emit", provider_emission.clone());
manager.update_cache(provider_emission).await;
Ok(())
},
};

if let Err(err) = res {
Expand Down
56 changes: 25 additions & 31 deletions packages/desktop/src/providers/audio/audio_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ use std::{
time::Duration,
};

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::{
sync::mpsc::{self, Sender},
sync::mpsc::{self},
task,
time::sleep,
};
use tracing::debug;
use windows::Win32::{
Expand All @@ -33,10 +31,11 @@ use windows::Win32::{
};
use windows_core::PCWSTR;

use crate::providers::{Provider, ProviderOutput, ProviderResult};
use crate::providers::{
CommonProviderState, Provider, ProviderEmitter, RuntimeType,
};

static PROVIDER_TX: OnceLock<mpsc::Sender<ProviderResult>> =
OnceLock::new();
static PROVIDER_TX: OnceLock<ProviderEmitter> = OnceLock::new();

static AUDIO_STATE: OnceLock<Arc<Mutex<AudioOutput>>> = OnceLock::new();

Expand Down Expand Up @@ -279,26 +278,29 @@ impl IMMNotificationClient_Impl for MediaDeviceEventHandler_Impl {
}

pub struct AudioProvider {
_config: AudioProviderConfig,
common: CommonProviderState,
}

impl AudioProvider {
pub fn new(config: AudioProviderConfig) -> Self {
Self { _config: config }
pub fn new(
_config: AudioProviderConfig,
common: CommonProviderState,
) -> Self {
Self { common }
}

fn emit_volume() {
if let Some(tx) = PROVIDER_TX.get() {
let output = AUDIO_STATE.get().unwrap().lock().unwrap().clone();
let _ = tx.try_send(Ok(ProviderOutput::Audio(output)).into());
tx.emit_output(Ok(output));
}
}

async fn handle_volume_updates(mut rx: mpsc::Receiver<(String, u32)>) {
fn handle_volume_updates(mut rx: mpsc::Receiver<(String, u32)>) {
const PROCESS_DELAY: Duration = Duration::from_millis(50);
let mut latest_updates = HashMap::new();

while let Some((device_id, volume)) = rx.recv().await {
while let Some((device_id, volume)) = rx.blocking_recv() {
latest_updates.insert(device_id, volume);

// Collect any additional pending updates without waiting.
Expand All @@ -307,7 +309,7 @@ impl AudioProvider {
}

// Brief delay to collect more potential updates.
sleep(PROCESS_DELAY).await;
std::thread::sleep(PROCESS_DELAY);

// Process all collected updates.
if let Some(state) = AUDIO_STATE.get() {
Expand Down Expand Up @@ -369,11 +371,14 @@ impl AudioProvider {
}
}

#[async_trait]
impl Provider for AudioProvider {
async fn run(&self, emit_result_tx: Sender<ProviderResult>) {
fn runtime_type(&self) -> RuntimeType {
RuntimeType::Sync
}

fn start_sync(&mut self) {
PROVIDER_TX
.set(emit_result_tx.clone())
.set(self.common.emitter.clone())
.expect("Error setting provider tx in audio provider");

AUDIO_STATE
Expand All @@ -383,22 +388,11 @@ impl Provider for AudioProvider {
// Create a channel for volume updates.
let (update_tx, update_rx) = mpsc::channel(100);

// Spawn both tasks.
let update_handler =
task::spawn(Self::handle_volume_updates(update_rx));

let manager = task::spawn_blocking(move || {
if let Err(err) = Self::create_audio_manager(update_tx) {
emit_result_tx
.blocking_send(Err(err).into())
.expect("Error with media provider");
}
});
// Spawn task for handling volume updates.
task::spawn_blocking(move || Self::handle_volume_updates(update_rx));

// Wait for either task to complete (though they should run forever).
tokio::select! {
_ = manager => debug!("Audio manager stopped unexpectedly"),
_ = update_handler => debug!("Update handler stopped unexpectedly"),
if let Err(err) = Self::create_audio_manager(update_tx) {
self.common.emitter.emit_output::<AudioOutput>(Err(err));
}
}
}
Loading

0 comments on commit 315608a

Please sign in to comment.