Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: setup for provider functions #156

Merged
merged 38 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
036f4a7
feat: add `ThreadingType` and conditionally spawn blocking
lars-berger Oct 30, 2024
6833aa4
wip
lars-berger Nov 15, 2024
f87f3df
feat: rename ThreadingType -> RuntimeType; add `call_function` to `Pr…
lars-berger Nov 16, 2024
a148da0
wip add `call_function` on provider manager
lars-berger Nov 16, 2024
9a10437
feat: move provider emit listener to event listener loop
lars-berger Nov 16, 2024
fe13508
feat: change destroy and call_function to use channels
lars-berger Nov 16, 2024
cd27f15
wip moving out logic from `ProviderRef`
lars-berger Nov 16, 2024
4d4a58d
feat: remove `provider_ref.rs`; add all logic to provider manager
lars-berger Nov 16, 2024
d2bdb66
feat: add `CommonProviderState`; replace `ProviderEmission` with type
lars-berger Nov 16, 2024
825448a
refactor: return task handle from `create_instance`
lars-berger Nov 16, 2024
f9a2a64
feat: pass `common` to all providers
lars-berger Nov 16, 2024
ca02eb2
fixes + change how providers are stopped
lars-berger Nov 17, 2024
5bfb93a
feat: add emit_tx
lars-berger Nov 17, 2024
fa8202c
feat: change `ProviderEmission` to a struct
lars-berger Nov 17, 2024
695e555
feat: add `query_ip` method to `IpProvider`
lars-berger Nov 17, 2024
d7e1ca2
feat: add `SyncInterval` and `AsyncInterval`
lars-berger Nov 17, 2024
0dd075b
feat: remove `refresh_interval_ms` getter; add `emit_provider_output` fn
lars-berger Nov 17, 2024
8607697
feat: remove use of `impl_interval_provider`; manually implement `Pro…
lars-berger Nov 17, 2024
c7a4580
fix: add `async_trait` to async providers
lars-berger Nov 17, 2024
efb9f0d
feat: add `ProviderEmitter` helper struct
lars-berger Nov 17, 2024
a225157
feat: add custom serializer for provider emission (compiling and work…
lars-berger Nov 17, 2024
48ba003
feat: add tauri command for calling provider functions
lars-berger Nov 17, 2024
1184282
feat: remove `stop` and `function` channels in favor of a single `mes…
lars-berger Nov 17, 2024
712c06a
wip sync + async receiver with crossbeam
lars-berger Nov 18, 2024
d4ff885
wip add `ProviderConsumer`
lars-berger Nov 18, 2024
eb35221
rename `ProviderConsumer` -> `ProviderInput`
lars-berger Nov 18, 2024
73d4546
feat: handle sending inputs to async + sync runtimes
lars-berger Nov 18, 2024
fe97d94
feat: working crossbeam impl for memory provider (but ticks don't fir…
lars-berger Nov 18, 2024
8d477d1
feat: use crossbeam select! in all sync interval providers
lars-berger Nov 18, 2024
4d9b3de
feat: use `select!` in ip and weather provider
lars-berger Nov 18, 2024
c986c83
feat: evict cache on provider stop
lars-berger Nov 18, 2024
d925568
feat: prevent duplicate providers from being created
lars-berger Nov 18, 2024
6a08237
refactor: improve comments in sync + async interval structs
lars-berger Nov 18, 2024
6a4b28d
refactor: simplify macro for implementing `From<T>` for `ProviderOutput`
lars-berger Nov 19, 2024
06e7a78
fix: emit immediately with `SyncInterval`
lars-berger Nov 19, 2024
922b6ba
feat: add logging
lars-berger Nov 19, 2024
72d7f4e
refactor: remove commented fn
lars-berger Nov 19, 2024
25eefdc
feat: rebase changes for audio provider
lars-berger Nov 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/desktop/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ tauri-build = { version = "2.0", features = [] }
anyhow = "1"
async-trait = "0.1"
clap = { version = "4", features = ["derive"] }
crossbeam = "0.8"
reqwest = { version = "0.11", features = ["json"] }
tauri = { version = "2.0", features = [
"devtools",
Expand Down
19 changes: 17 additions & 2 deletions packages/desktop/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use crate::common::macos::WindowExtMacOs;
use crate::common::windows::WindowExtWindows;
use crate::{
config::{Config, WidgetConfig, WidgetPlacement},
providers::{ProviderConfig, ProviderManager},
providers::{
ProviderConfig, ProviderFunction, ProviderFunctionResponse,
ProviderManager,
},
widget_factory::{WidgetFactory, WidgetOpenOptions, WidgetState},
};

Expand Down Expand Up @@ -98,7 +101,19 @@ pub async fn unlisten_provider(
provider_manager: State<'_, Arc<ProviderManager>>,
) -> anyhow::Result<(), String> {
provider_manager
.destroy(config_hash)
.stop(config_hash)
.await
.map_err(|err| err.to_string())
}

#[tauri::command]
pub async fn call_provider_function(
config_hash: String,
function: ProviderFunction,
provider_manager: State<'_, Arc<ProviderManager>>,
) -> anyhow::Result<ProviderFunctionResponse, String> {
provider_manager
.call_function(config_hash, function)
.await
.map_err(|err| err.to_string())
}
Expand Down
67 changes: 67 additions & 0 deletions packages/desktop/src/common/interval.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::time::{Duration, Instant};

/// An interval timer for synchronous contexts using crossbeam.
///
/// For use with crossbeam's `select!` macro.
pub struct SyncInterval {
interval: Duration,
next_tick: Instant,
is_first: bool,
}

impl SyncInterval {
pub fn new(interval_ms: u64) -> Self {
Self {
interval: Duration::from_millis(interval_ms),
next_tick: Instant::now(),
is_first: true,
}
}

/// Returns a receiver that will get a message at the next tick time.
pub fn tick(&mut self) -> crossbeam::channel::Receiver<Instant> {
if self.is_first {
// Emit immediately on the first tick.
self.is_first = false;
crossbeam::channel::after(Duration::from_secs(0))
} else if let Some(wait_duration) =
self.next_tick.checked_duration_since(Instant::now())
{
// Wait normally until the next tick.
let timer = crossbeam::channel::after(wait_duration);
self.next_tick += self.interval;
timer
} else {
// We're behind - skip missed ticks to catch up.
while self.next_tick <= Instant::now() {
self.next_tick += self.interval;
}

crossbeam::channel::after(self.next_tick - Instant::now())
}
}
}

/// An interval timer for asynchronous contexts using tokio.
pub struct AsyncInterval {
interval: tokio::time::Interval,
}

impl AsyncInterval {
pub fn new(interval_ms: u64) -> Self {
let mut interval =
tokio::time::interval(Duration::from_millis(interval_ms));

// Skip missed ticks when the interval runs. This prevents a burst
// of backlogged ticks after a delay.
interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

Self { interval }
}

/// Returns a future that will complete at the next tick time.
pub async fn tick(&mut self) {
self.interval.tick().await;
}
}
2 changes: 2 additions & 0 deletions packages/desktop/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod format_bytes;
mod fs_util;
mod interval;
mod length_value;
#[cfg(target_os = "macos")]
pub mod macos;
Expand All @@ -9,5 +10,6 @@ pub mod windows;

pub use format_bytes::*;
pub use fs_util::*;
pub use interval::*;
pub use length_value::*;
pub use path_ext::*;
28 changes: 23 additions & 5 deletions packages/desktop/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use std::{env, sync::Arc};
use clap::Parser;
use cli::MonitorType;
use config::{MonitorSelection, WidgetPlacement};
use providers::ProviderEmission;
use tauri::{
async_runtime::block_on, AppHandle, Emitter, Manager, RunEvent,
};
use tokio::task;
use tokio::{sync::mpsc, task};
use tracing::{error, info, level_filters::LevelFilter};
use tracing_subscriber::EnvFilter;
use widget_factory::WidgetOpenOptions;
Expand Down Expand Up @@ -88,6 +89,7 @@ async fn main() -> anyhow::Result<()> {
commands::update_widget_config,
commands::listen_provider,
commands::unlisten_provider,
commands::call_provider_function,
commands::set_always_on_top,
commands::set_skip_taskbar
])
Expand Down Expand Up @@ -173,8 +175,8 @@ async fn start_app(app: &mut tauri::App, cli: Cli) -> anyhow::Result<()> {
app.handle().plugin(tauri_plugin_dialog::init())?;

// Initialize `ProviderManager` in Tauri state.
let manager = Arc::new(ProviderManager::new(app.handle()));
app.manage(manager);
let (manager, emit_rx) = ProviderManager::new(app.handle());
app.manage(manager.clone());

// Open widgets based on CLI command.
open_widgets_by_cli_command(cli, widget_factory.clone()).await?;
Expand All @@ -184,7 +186,15 @@ async fn start_app(app: &mut tauri::App, cli: Cli) -> anyhow::Result<()> {
SysTray::new(app.handle(), config.clone(), widget_factory.clone())
.await?;

listen_events(app.handle(), config, monitor_state, widget_factory, tray);
listen_events(
app.handle(),
config,
monitor_state,
widget_factory,
tray,
manager,
emit_rx,
);

Ok(())
}
Expand All @@ -194,7 +204,9 @@ fn listen_events(
config: Arc<Config>,
monitor_state: Arc<MonitorState>,
widget_factory: Arc<WidgetFactory>,
tray: Arc<SysTray>,
tray: SysTray,
manager: Arc<ProviderManager>,
mut emit_rx: mpsc::UnboundedReceiver<ProviderEmission>,
) {
let app_handle = app_handle.clone();
let mut widget_open_rx = widget_factory.open_tx.subscribe();
Expand Down Expand Up @@ -231,6 +243,12 @@ fn listen_events(
info!("Widget configs changed.");
widget_factory.relaunch_by_paths(&changed_configs.keys().cloned().collect()).await
},
Some(provider_emission) = emit_rx.recv() => {
info!("Provider emission: {:?}", provider_emission);
app_handle.emit("provider-emit", provider_emission.clone());
manager.update_cache(provider_emission).await;
Ok(())
},
};

if let Err(err) = res {
Expand Down
56 changes: 25 additions & 31 deletions packages/desktop/src/providers/audio/audio_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ use std::{
time::Duration,
};

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::{
sync::mpsc::{self, Sender},
sync::mpsc::{self},
task,
time::sleep,
};
use tracing::debug;
use windows::Win32::{
Expand All @@ -33,10 +31,11 @@ use windows::Win32::{
};
use windows_core::PCWSTR;

use crate::providers::{Provider, ProviderOutput, ProviderResult};
use crate::providers::{
CommonProviderState, Provider, ProviderEmitter, RuntimeType,
};

static PROVIDER_TX: OnceLock<mpsc::Sender<ProviderResult>> =
OnceLock::new();
static PROVIDER_TX: OnceLock<ProviderEmitter> = OnceLock::new();

static AUDIO_STATE: OnceLock<Arc<Mutex<AudioOutput>>> = OnceLock::new();

Expand Down Expand Up @@ -279,26 +278,29 @@ impl IMMNotificationClient_Impl for MediaDeviceEventHandler_Impl {
}

pub struct AudioProvider {
_config: AudioProviderConfig,
common: CommonProviderState,
}

impl AudioProvider {
pub fn new(config: AudioProviderConfig) -> Self {
Self { _config: config }
pub fn new(
_config: AudioProviderConfig,
common: CommonProviderState,
) -> Self {
Self { common }
}

fn emit_volume() {
if let Some(tx) = PROVIDER_TX.get() {
let output = AUDIO_STATE.get().unwrap().lock().unwrap().clone();
let _ = tx.try_send(Ok(ProviderOutput::Audio(output)).into());
tx.emit_output(Ok(output));
}
}

async fn handle_volume_updates(mut rx: mpsc::Receiver<(String, u32)>) {
fn handle_volume_updates(mut rx: mpsc::Receiver<(String, u32)>) {
const PROCESS_DELAY: Duration = Duration::from_millis(50);
let mut latest_updates = HashMap::new();

while let Some((device_id, volume)) = rx.recv().await {
while let Some((device_id, volume)) = rx.blocking_recv() {
latest_updates.insert(device_id, volume);

// Collect any additional pending updates without waiting.
Expand All @@ -307,7 +309,7 @@ impl AudioProvider {
}

// Brief delay to collect more potential updates.
sleep(PROCESS_DELAY).await;
std::thread::sleep(PROCESS_DELAY);

// Process all collected updates.
if let Some(state) = AUDIO_STATE.get() {
Expand Down Expand Up @@ -369,11 +371,14 @@ impl AudioProvider {
}
}

#[async_trait]
impl Provider for AudioProvider {
async fn run(&self, emit_result_tx: Sender<ProviderResult>) {
fn runtime_type(&self) -> RuntimeType {
RuntimeType::Sync
}

fn start_sync(&mut self) {
PROVIDER_TX
.set(emit_result_tx.clone())
.set(self.common.emitter.clone())
.expect("Error setting provider tx in audio provider");

AUDIO_STATE
Expand All @@ -383,22 +388,11 @@ impl Provider for AudioProvider {
// Create a channel for volume updates.
let (update_tx, update_rx) = mpsc::channel(100);

// Spawn both tasks.
let update_handler =
task::spawn(Self::handle_volume_updates(update_rx));

let manager = task::spawn_blocking(move || {
if let Err(err) = Self::create_audio_manager(update_tx) {
emit_result_tx
.blocking_send(Err(err).into())
.expect("Error with media provider");
}
});
// Spawn task for handling volume updates.
task::spawn_blocking(move || Self::handle_volume_updates(update_rx));

// Wait for either task to complete (though they should run forever).
tokio::select! {
_ = manager => debug!("Audio manager stopped unexpectedly"),
_ = update_handler => debug!("Update handler stopped unexpectedly"),
if let Err(err) = Self::create_audio_manager(update_tx) {
self.common.emitter.emit_output::<AudioOutput>(Err(err));
}
}
}
Loading
Loading