From 04a5bdb7937ac39ac9d096812264fcd019b63a0e Mon Sep 17 00:00:00 2001 From: Andreas Pehrson Date: Tue, 10 Oct 2023 22:59:36 +0200 Subject: [PATCH] Always try a VoiceProcessingIO AudioUnit The drift correction provided by aggregate devices has proven unreliable as a usb input paired with builtin or connected analog speakers often underruns within a few minutes, depending on clock drift. The VoiceProcessingIO AudioUnit handles drift properly. This patch will try to set up the VoiceProcessingIO AudioUnit whenever we're in duplex. This patch also sets up a failover path so that if setting up the VoiceProcessingIO AudioUnit fails, we try an aggregate device or a plain AudioUnit, in that order. For now, always in bypass mode, as we don't yet have an api to toggle audio processing features dynamically. Note that the VoiceProcessingIO AudioUnit will implicitly enable audio ducking in the platform. --- run_device_tests.sh | 5 +- src/backend/mod.rs | 384 ++++++++++++++++++++++++++++++++------------ 2 files changed, 289 insertions(+), 100 deletions(-) diff --git a/run_device_tests.sh b/run_device_tests.sh index b1a8f1e9..ae6c3eb3 100755 --- a/run_device_tests.sh +++ b/run_device_tests.sh @@ -14,7 +14,10 @@ cargo test test_plug_and_unplug_device -- --ignored --nocapture cargo test test_register_device_changed_callback_to_check_default_device_changed_input -- --ignored --nocapture cargo test test_register_device_changed_callback_to_check_default_device_changed_output -- --ignored --nocapture -cargo test test_register_device_changed_callback_to_check_default_device_changed_duplex -- --ignored --nocapture + +# FIXME: The test will hang if the default input or output is an aggregate device (and VoiceProcessingIO is) +# cargo test test_register_device_changed_callback_to_check_default_device_changed_duplex -- --ignored --nocapture + cargo test test_register_device_changed_callback_to_check_input_alive_changed_input -- --ignored --nocapture cargo test test_register_device_changed_callback_to_check_input_alive_changed_duplex -- --ignored --nocapture diff --git a/src/backend/mod.rs b/src/backend/mod.rs index 973923f8..b990845d 100644 --- a/src/backend/mod.rs +++ b/src/backend/mod.rs @@ -42,7 +42,7 @@ use std::cmp; use std::ffi::{CStr, CString}; use std::fmt; use std::mem; -use std::os::raw::c_void; +use std::os::raw::{c_uint, c_void}; use std::ptr; use std::slice; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; @@ -55,6 +55,7 @@ const AU_IN_BUS: AudioUnitElement = 1; const DISPATCH_QUEUE_LABEL: &str = "org.mozilla.cubeb"; const PRIVATE_AGGREGATE_DEVICE_NAME: &str = "CubebAggregateDevice"; +const VOICEPROCESSING_AGGREGATE_DEVICE_NAME: &str = "VPAUAggregateAudioDevice"; // Testing empirically, some headsets report a minimal latency that is very low, // but this does not work in practice. Lie and say the minimum is 128 frames. @@ -464,8 +465,11 @@ extern "C" fn audiounit_input_callback( }; // If the input (input-only stream) or the output is drained (duplex stream), - // cancel this callback. - if stm.draining.load(Ordering::SeqCst) { + // cancel this callback. Note that for voice processing cases (a single unit), + // the output callback handles stopping the unit and notifying of state. + if stm.core_stream_data.input_unit != stm.core_stream_data.output_unit + && stm.draining.load(Ordering::SeqCst) + { let r = stop_audiounit(stm.core_stream_data.input_unit); assert!(r.is_ok()); // Only fire state-changed callback for input-only stream. @@ -963,6 +967,7 @@ fn create_audiounit(device: &device_info) -> Result { .contains(device_flags::DEV_INPUT | device_flags::DEV_OUTPUT)); let unit = create_blank_audiounit()?; + let mut bus = AU_OUT_BUS; if device.flags.contains(device_flags::DEV_INPUT) { // Input only. @@ -976,6 +981,7 @@ fn create_audiounit(device: &device_info) -> Result { dispose_audio_unit(unit); return Err(Error::error()); } + bus = AU_IN_BUS; } if device.flags.contains(device_flags::DEV_OUTPUT) { @@ -990,9 +996,10 @@ fn create_audiounit(device: &device_info) -> Result { dispose_audio_unit(unit); return Err(Error::error()); } + bus = AU_OUT_BUS; } - if let Err(e) = set_device_to_audiounit(unit, device.id) { + if let Err(e) = set_device_to_audiounit(unit, device.id, bus) { cubeb_log!( "Failed to set device {} to the created audiounit. Error: {}", device.id, @@ -1005,6 +1012,55 @@ fn create_audiounit(device: &device_info) -> Result { Ok(unit) } +fn create_voiceprocessing_audiounit( + in_device: &device_info, + out_device: &device_info, +) -> Result { + assert!(in_device.flags.contains(device_flags::DEV_INPUT)); + assert!(!in_device.flags.contains(device_flags::DEV_OUTPUT)); + assert!(!out_device.flags.contains(device_flags::DEV_INPUT)); + assert!(out_device.flags.contains(device_flags::DEV_OUTPUT)); + + let unit = create_typed_audiounit(kAudioUnitSubType_VoiceProcessingIO)?; + + if let Err(e) = set_device_to_audiounit(unit, in_device.id, AU_IN_BUS) { + cubeb_log!( + "Failed to set in device {} to the created audiounit. Error: {}", + in_device.id, + e + ); + dispose_audio_unit(unit); + return Err(Error::error()); + } + + if let Err(e) = set_device_to_audiounit(unit, out_device.id, AU_OUT_BUS) { + cubeb_log!( + "Failed to set out device {} to the created audiounit. Error: {}", + out_device.id, + e + ); + dispose_audio_unit(unit); + return Err(Error::error()); + } + + let bypass = u32::from(true); + let r = audio_unit_set_property( + unit, + kAudioUnitProperty_BypassEffect, + kAudioUnitScope_Global, + AU_IN_BUS, + &bypass, + mem::size_of::(), + ); + if r != NO_ERR { + cubeb_log!("Failed to enable bypass of voiceprocessing. Error: {}", r); + dispose_audio_unit(unit); + return Err(Error::error()); + } + + Ok(unit) +} + fn enable_audiounit_scope( unit: AudioUnit, devtype: DeviceType, @@ -1039,6 +1095,7 @@ fn enable_audiounit_scope( fn set_device_to_audiounit( unit: AudioUnit, device_id: AudioObjectID, + bus: AudioUnitElement, ) -> std::result::Result<(), OSStatus> { assert!(!unit.is_null()); @@ -1046,7 +1103,7 @@ fn set_device_to_audiounit( unit, kAudioOutputUnitProperty_CurrentDevice, kAudioUnitScope_Global, - 0, + bus, &device_id, mem::size_of::(), ); @@ -1057,13 +1114,10 @@ fn set_device_to_audiounit( } } -fn create_blank_audiounit() -> Result { +fn create_typed_audiounit(sub_type: c_uint) -> Result { let desc = AudioComponentDescription { componentType: kAudioUnitType_Output, - #[cfg(not(target_os = "ios"))] - componentSubType: kAudioUnitSubType_HALOutput, - #[cfg(target_os = "ios")] - componentSubType: kAudioUnitSubType_RemoteIO, + componentSubType: sub_type, componentManufacturer: kAudioUnitManufacturer_Apple, componentFlags: 0, componentFlagsMask: 0, @@ -1085,6 +1139,13 @@ fn create_blank_audiounit() -> Result { } } +fn create_blank_audiounit() -> Result { + #[cfg(not(target_os = "ios"))] + return create_typed_audiounit(kAudioUnitSubType_HALOutput); + #[cfg(target_os = "ios")] + return create_typed_audiounit(kAudioUnitSubType_RemoteIO); +} + fn get_buffer_size(unit: AudioUnit, devtype: DeviceType) -> std::result::Result { assert!(!unit.is_null()); let (scope, element) = match devtype { @@ -1671,6 +1732,7 @@ fn audiounit_get_devices_of_type(devtype: DeviceType) -> Vec { } else if let Ok(uid) = get_device_global_uid(device) { let uid = uid.into_string(); !uid.contains(PRIVATE_AGGREGATE_DEVICE_NAME) + && !uid.contains(VOICEPROCESSING_AGGREGATE_DEVICE_NAME) } else { // Fail to get device uid. true @@ -2400,7 +2462,7 @@ impl<'ctx> CoreStreamData<'ctx> { if !self.input_unit.is_null() { start_audiounit(self.input_unit)?; } - if !self.output_unit.is_null() { + if self.input_unit != self.output_unit && !self.output_unit.is_null() { start_audiounit(self.output_unit)?; } Ok(()) @@ -2411,7 +2473,7 @@ impl<'ctx> CoreStreamData<'ctx> { let r = stop_audiounit(self.input_unit); assert!(r.is_ok()); } - if !self.output_unit.is_null() { + if self.input_unit != self.output_unit && !self.output_unit.is_null() { let r = stop_audiounit(self.output_unit); assert!(r.is_ok()); } @@ -2425,44 +2487,6 @@ impl<'ctx> CoreStreamData<'ctx> { self.output_stream_params.rate() > 0 } - fn should_use_aggregate_device(&self) -> bool { - // It's impossible to create an aggregate device from an aggregate device, and it's - // unnecessary to create an aggregate device when opening the same device input/output. In - // all other cases, use an aggregate device. - let mut either_already_aggregate = false; - if self.has_input() { - let input_is_aggregate = - get_device_transport_type(self.input_device.id, DeviceType::INPUT).unwrap_or(0) - == kAudioDeviceTransportTypeAggregate; - if input_is_aggregate { - either_already_aggregate = true; - } - cubeb_log!( - "Input device ID: {} (aggregate: {:?})", - self.input_device.id, - input_is_aggregate - ); - } - if self.has_output() { - let output_is_aggregate = - get_device_transport_type(self.output_device.id, DeviceType::OUTPUT).unwrap_or(0) - == kAudioDeviceTransportTypeAggregate; - if output_is_aggregate { - either_already_aggregate = true; - } - cubeb_log!( - "Output device ID: {} (aggregate: {:?})", - self.input_device.id, - output_is_aggregate - ); - } - // Only use an aggregate device when the device are different. - self.has_input() - && self.has_output() - && self.input_device.id != self.output_device.id - && !either_already_aggregate - } - fn same_clock_domain(&self) -> bool { // If not setting up a duplex stream, there is only one device, // no reclocking necessary. @@ -2487,6 +2511,164 @@ impl<'ctx> CoreStreamData<'ctx> { input_domain == output_domain } + fn create_audiounits(&mut self) -> Result<(device_info, device_info)> { + let should_use_voice_processing_unit = self.has_input() && self.has_output(); + + let should_use_aggregate_device = { + // It's impossible to create an aggregate device from an aggregate device, and it's + // unnecessary to create an aggregate device when opening the same device input/output. In + // all other cases, use an aggregate device. + let mut either_already_aggregate = false; + if self.has_input() { + let input_is_aggregate = + get_device_transport_type(self.input_device.id, DeviceType::INPUT).unwrap_or(0) + == kAudioDeviceTransportTypeAggregate; + if input_is_aggregate { + either_already_aggregate = true; + } + cubeb_log!( + "Input device ID: {} (aggregate: {:?})", + self.input_device.id, + input_is_aggregate + ); + } + if self.has_output() { + let output_is_aggregate = + get_device_transport_type(self.output_device.id, DeviceType::OUTPUT) + .unwrap_or(0) + == kAudioDeviceTransportTypeAggregate; + if output_is_aggregate { + either_already_aggregate = true; + } + cubeb_log!( + "Output device ID: {} (aggregate: {:?})", + self.input_device.id, + output_is_aggregate + ); + } + // Only use an aggregate device when the device are different. + self.has_input() + && self.has_output() + && self.input_device.id != self.output_device.id + && !either_already_aggregate + }; + + // Create an AudioUnit: + // - If we're eligible to use voice processing, try creating a VoiceProcessingIO AudioUnit. + // - If we should use an aggregate device, try creating one and input and output AudioUnits next. + // - As last resort, create regular AudioUnits. This is also the normal non-duplex path. + + if should_use_voice_processing_unit { + if let Ok(au) = + create_voiceprocessing_audiounit(&self.input_device, &self.output_device) + { + cubeb_log!("({:p}) Using VoiceProcessingIO AudioUnit", self.stm_ptr); + self.input_unit = au; + self.output_unit = au; + return Ok((self.input_device.clone(), self.output_device.clone())); + } + cubeb_log!( + "({:p}) Failed to create VoiceProcessingIO AudioUnit. Trying a regular one.", + self.stm_ptr + ); + } + + if should_use_aggregate_device { + if let Ok(device) = AggregateDevice::new(self.input_device.id, self.output_device.id) { + let in_dev_info = { + device_info { + id: device.get_device_id(), + ..self.input_device + } + }; + let out_dev_info = { + device_info { + id: device.get_device_id(), + ..self.output_device + } + }; + + match ( + create_audiounit(&in_dev_info), + create_audiounit(&out_dev_info), + ) { + (Ok(in_au), Ok(out_au)) => { + cubeb_log!( + "({:p}) Using an aggregate device {} for input and output.", + self.stm_ptr, + self.aggregate_device.as_ref().unwrap().get_device_id() + ); + self.aggregate_device = Some(device); + self.input_unit = in_au; + self.output_unit = out_au; + return Ok((in_dev_info, out_dev_info)); + } + (Err(e), Ok(au)) => { + cubeb_log!( + "({:p}) Failed to create input AudioUnit for aggregate device. Error: {}.", + self.stm_ptr, + e + ); + dispose_audio_unit(au); + } + (Ok(au), Err(e)) => { + cubeb_log!( + "({:p}) Failed to create output AudioUnit for aggregate device. Error: {}.", + self.stm_ptr, + e + ); + dispose_audio_unit(au); + } + (Err(e), _) => { + cubeb_log!( + "({:p}) Failed to create AudioUnits for aggregate device. Error: {}.", + self.stm_ptr, + e + ); + } + } + } + cubeb_log!( + "({:p}) Failed to set up aggregate device. Using regular AudioUnits.", + self.stm_ptr + ); + } + + if self.has_input() { + match create_audiounit(&self.input_device) { + Ok(in_au) => self.input_unit = in_au, + Err(e) => { + cubeb_log!( + "({:p}) Failed to create regular AudioUnit for input. Error: {}", + self.stm_ptr, + e + ); + return Err(e); + } + } + } + + if self.has_output() { + match create_audiounit(&self.output_device) { + Ok(out_au) => self.output_unit = out_au, + Err(e) => { + cubeb_log!( + "({:p}) Failed to create regular AudioUnit for output. Error: {}", + self.stm_ptr, + e + ); + if !self.input_unit.is_null() { + dispose_audio_unit(self.input_unit); + self.input_unit = ptr::null_mut(); + } + return Err(e); + } + } + } + + Ok((self.input_device.clone(), self.output_device.clone())) + } + #[allow(clippy::cognitive_complexity)] // TODO: Refactoring. fn setup(&mut self) -> Result<()> { fn get_device_channel_count(id: AudioDeviceID, devtype: DeviceType) -> Option { @@ -2518,42 +2700,18 @@ impl<'ctx> CoreStreamData<'ctx> { return Err(Error::not_supported()); } - let mut in_dev_info = self.input_device.clone(); - let mut out_dev_info = self.output_device.clone(); let same_clock_domain = self.same_clock_domain(); - - if self.should_use_aggregate_device() { - match AggregateDevice::new(in_dev_info.id, out_dev_info.id) { - Ok(device) => { - in_dev_info.id = device.get_device_id(); - out_dev_info.id = device.get_device_id(); - in_dev_info.flags = device_flags::DEV_INPUT; - out_dev_info.flags = device_flags::DEV_OUTPUT; - self.aggregate_device = Some(device); - cubeb_log!( - "({:p}) Using an aggregate device {} for input and output.", - self.stm_ptr, - self.aggregate_device.as_ref().unwrap().get_device_id() - ); - } - Err(e) => { - cubeb_log!( - "({:p}) Creation of aggregate devices failed. Error: {}.\ - Using assigned devices directly instead.", - self.stm_ptr, - e - ); - } - } - } else { - cubeb_log!("Not using an aggregate device"); - } + let (in_dev_info, out_dev_info) = self.create_audiounits()?; + let using_voice_processing_unit = + !self.input_unit.is_null() && self.input_unit == self.output_unit; assert!(!self.stm_ptr.is_null()); let stream = unsafe { &(*self.stm_ptr) }; // Configure I/O stream if self.has_input() { + assert!(!self.input_unit.is_null()); + cubeb_log!( "({:p}) Initializing input by device info: {:?}", self.stm_ptr, @@ -2566,20 +2724,16 @@ impl<'ctx> CoreStreamData<'ctx> { return Err(Error::invalid_parameter()); } - self.input_unit = create_audiounit(&in_dev_info).map_err(|e| { - cubeb_log!("({:p}) AudioUnit creation for input failed.", self.stm_ptr); - e - })?; - cubeb_log!( - "({:p}) Opening input side: rate {}, channels {}, format {:?}, layout {:?}, prefs {:?}, latency in frames {}.", + "({:p}) Opening input side: rate {}, channels {}, format {:?}, layout {:?}, prefs {:?}, latency in frames {}, voice processing {}.", self.stm_ptr, self.input_stream_params.rate(), self.input_stream_params.channels(), self.input_stream_params.format(), self.input_stream_params.layout(), self.input_stream_params.prefs(), - stream.latency_frames + stream.latency_frames, + using_voice_processing_unit ); // Get input device hardware information. @@ -2615,7 +2769,15 @@ impl<'ctx> CoreStreamData<'ctx> { // channels to the audio callback. let params = unsafe { let mut p = *self.input_stream_params.as_ptr(); - p.channels = input_hw_desc.mChannelsPerFrame; + p.channels = if using_voice_processing_unit { + // VPIO's input_hw_desc.mChannelsPerFrame reports an oddly high number of channels + // (we've seen 7 channels reported for a mono webcam for instance). When doing + // voice processing, use what the device reported -- VPIO doesn't work on top of + // aggregate devices. + device_channel_count + } else { + input_hw_desc.mChannelsPerFrame + }; // Input AudioUnit must be configured with device's sample rate. // we will resample inside input callback. p.rate = input_hw_desc.mSampleRate as _; @@ -2684,7 +2846,9 @@ impl<'ctx> CoreStreamData<'ctx> { self.input_stream_params.format(), SAFE_MAX_LATENCY_FRAMES as usize, self.input_dev_desc.mChannelsPerFrame as usize, - (self.input_dev_desc.mChannelsPerFrame - device_channel_count) as usize, + self.input_dev_desc + .mChannelsPerFrame + .saturating_sub(device_channel_count) as usize, self.input_stream_params.channels() as usize, )); @@ -2719,17 +2883,14 @@ impl<'ctx> CoreStreamData<'ctx> { } if self.has_output() { + assert!(!self.output_unit.is_null()); + cubeb_log!( "({:p}) Initialize output by device info: {:?}", self.stm_ptr, out_dev_info ); - self.output_unit = create_audiounit(&out_dev_info).map_err(|e| { - cubeb_log!("({:p}) AudioUnit creation for output failed.", self.stm_ptr); - e - })?; - cubeb_log!( "({:p}) Opening output side: rate {}, channels {}, format {:?}, layout {:?}, prefs {:?}, latency in frames {}.", self.stm_ptr, @@ -2781,6 +2942,9 @@ impl<'ctx> CoreStreamData<'ctx> { let params = unsafe { let mut p = *self.output_stream_params.as_ptr(); p.channels = output_hw_desc.mChannelsPerFrame; + if using_voice_processing_unit { + p.rate = self.input_dev_desc.mSampleRate as _; + } StreamParams::from(p) }; @@ -2792,7 +2956,20 @@ impl<'ctx> CoreStreamData<'ctx> { e })?; - let device_layout = audiounit_get_current_channel_layout(self.output_unit); + // XXX + let device_layout = if using_voice_processing_unit { + let mut channels = Vec::with_capacity(output_hw_desc.mChannelsPerFrame as usize); + match output_hw_desc.mChannelsPerFrame { + 1 => channels.push(mixer::Channel::FrontCenter), + _ => { + channels.push(mixer::Channel::FrontLeft); + channels.push(mixer::Channel::FrontRight); + } + } + channels + } else { + audiounit_get_current_channel_layout(self.output_unit) + }; // The mixer will be set up when // 1. using aggregate device whose input device has output channels @@ -2905,7 +3082,9 @@ impl<'ctx> CoreStreamData<'ctx> { None }; let resampler_output_params = if self.has_output() { - Some(unsafe { *(self.output_stream_params.as_ptr()) }) + let mut p = unsafe { *(self.output_stream_params.as_ptr()) }; + p.rate = self.output_dev_desc.mSampleRate as u32; + Some(p) } else { None }; @@ -2955,10 +3134,12 @@ impl<'ctx> CoreStreamData<'ctx> { } if !self.output_unit.is_null() { - let r = audio_unit_initialize(self.output_unit); - if r != NO_ERR { - cubeb_log!("AudioUnitInitialize/output rv={}", r); - return Err(Error::error()); + if self.input_unit != self.output_unit { + let r = audio_unit_initialize(self.output_unit); + if r != NO_ERR { + cubeb_log!("AudioUnitInitialize/output rv={}", r); + return Err(Error::error()); + } } stream.output_device_latency_frames.store( @@ -3013,6 +3194,11 @@ impl<'ctx> CoreStreamData<'ctx> { } fn close(&mut self) { + if self.input_unit == self.output_unit { + // Handle the VoiceProcessIO case where there is a single unit. + self.output_unit = ptr::null_mut(); + } + if !self.input_unit.is_null() { audio_unit_uninitialize(self.input_unit); dispose_audio_unit(self.input_unit);