Skip to content

Commit

Permalink
debouncer
Browse files Browse the repository at this point in the history
  • Loading branch information
HolbyFPV committed Nov 16, 2024
1 parent deee929 commit cb41df5
Showing 1 changed file with 64 additions and 29 deletions.
93 changes: 64 additions & 29 deletions packages/desktop/src/providers/audio/audio_provider.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex, OnceLock},
time::Duration,
};

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::{
sync::mpsc::{self, Sender},
task,
time::sleep,
};
use windows::Win32::{
Devices::FunctionDiscovery::PKEY_Device_FriendlyName,
Expand Down Expand Up @@ -78,14 +80,19 @@ struct MediaDeviceEventHandler {
enumerator: IMMDeviceEnumerator,
device_state: Arc<Mutex<HashMap<String, DeviceInfo>>>,
current_device: String,
update_tx: mpsc::Sender<(String, f32)>,
}

impl MediaDeviceEventHandler {
fn new(enumerator: IMMDeviceEnumerator) -> Self {
fn new(
enumerator: IMMDeviceEnumerator,
update_tx: mpsc::Sender<(String, f32)>,
) -> Self {
Self {
enumerator,
device_state: Arc::new(Mutex::new(HashMap::new())),
current_device: String::new(),
update_tx,
}
}

Expand Down Expand Up @@ -171,14 +178,11 @@ impl MediaDeviceEventHandler {
impl Drop for MediaDeviceEventHandler {
fn drop(&mut self) {
unsafe {
let mut device_state = self.device_state.lock().unwrap();
for (device_id, device_info) in device_state.iter() {
device_info
.endpoint_volume
.UnregisterControlChangeNotify(
&IAudioEndpointVolumeCallback::from(self.clone()),
)
.expect("Failed to unregister volume callback");
let device_state = self.device_state.lock().unwrap();
for (_, device_info) in device_state.iter() {
let _ = device_info.endpoint_volume.UnregisterControlChangeNotify(
&IAudioEndpointVolumeCallback::from(self.clone()),
);
}
}
}
Expand All @@ -191,21 +195,9 @@ impl IAudioEndpointVolumeCallback_Impl for MediaDeviceEventHandler_Impl {
) -> windows::core::Result<()> {
unsafe {
if let Some(data) = data.as_ref() {
let device_id = &*self.current_device;
println!("Got notification for device: {}", device_id);

if let Some(state) = AUDIO_STATE.get() {
let mut output = state.lock().unwrap();
if let Some(device) = output.devices.get_mut(device_id) {
device.volume = data.fMasterVolume;
println!(
"Volume update for {} (ID: {}): {}",
device.name, device_id, data.fMasterVolume
);
drop(output);
AudioProvider::emit_volume();
}
}
let device_id = self.current_device.clone();
let volume = data.fMasterVolume;
let _ = self.update_tx.blocking_send((device_id, volume));
}
Ok(())
}
Expand Down Expand Up @@ -268,17 +260,48 @@ impl AudioProvider {
fn emit_volume() {
if let Some(tx) = PROVIDER_TX.get() {
let output = AUDIO_STATE.get().unwrap().lock().unwrap().clone();
println!("Emitting audio output: {:#?}", output);
let _ = tx.try_send(Ok(ProviderOutput::Audio(output)).into());
}
}

fn create_audio_manager() -> anyhow::Result<()> {
async fn handle_volume_updates(mut rx: mpsc::Receiver<(String, f32)>) {
const PROCESS_DELAY: Duration = Duration::from_millis(50);
let mut latest_updates: HashMap<String, f32> = HashMap::new();

while let Some((device_id, volume)) = rx.recv().await {
latest_updates.insert(device_id, volume);

// Collect any additional pending updates without waiting
while let Ok((device_id, volume)) = rx.try_recv() {
latest_updates.insert(device_id, volume);
}

// Brief delay to collect more potential updates
sleep(PROCESS_DELAY).await;

// Process all collected updates
if let Some(state) = AUDIO_STATE.get() {
let mut output = state.lock().unwrap();
for (device_id, volume) in latest_updates.drain() {
if let Some(device) = output.devices.get_mut(&device_id) {
device.volume = volume;
}
}
drop(output);
Self::emit_volume();
}
}
}

fn create_audio_manager(
update_tx: mpsc::Sender<(String, f32)>,
) -> anyhow::Result<()> {
unsafe {
let _ = CoInitializeEx(None, COINIT_MULTITHREADED);
let enumerator: IMMDeviceEnumerator =
CoCreateInstance(&MMDeviceEnumerator, None, CLSCTX_ALL)?;
let handler = MediaDeviceEventHandler::new(enumerator.clone());
let handler =
MediaDeviceEventHandler::new(enumerator.clone(), update_tx);

handler.enumerate_devices()?;

Expand Down Expand Up @@ -306,12 +329,24 @@ impl Provider for AudioProvider {
.set(Arc::new(Mutex::new(AudioOutput::new())))
.expect("Error setting initial audio state");

task::spawn_blocking(move || {
if let Err(err) = Self::create_audio_manager() {
// Create a channel for volume updates
let (update_tx, update_rx) = mpsc::channel(100);

// Spawn both tasks
let update_handler =
task::spawn(Self::handle_volume_updates(update_rx));
let manager = task::spawn_blocking(move || {
if let Err(err) = Self::create_audio_manager(update_tx) {
emit_result_tx
.blocking_send(Err(err).into())
.expect("Error with media provider");
}
});

// Wait for either task to complete (though they should run forever)
tokio::select! {
_ = manager => println!("Audio manager stopped unexpectedly"),
_ = update_handler => println!("Update handler stopped unexpectedly"),
}
}
}

0 comments on commit cb41df5

Please sign in to comment.