Skip to content

Commit

Permalink
test and fix interleaved host audio recv/send
Browse files Browse the repository at this point in the history
  • Loading branch information
Nico Chatzi committed Sep 23, 2023
1 parent 2132a9b commit f101aba
Show file tree
Hide file tree
Showing 18 changed files with 1,196 additions and 766 deletions.
6 changes: 3 additions & 3 deletions aud/cli/src/cmd/auscope/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
mod ui;

use aud::audio::HostedAudioProducer;
use aud::audio::HostAudioInput;
use ratatui::prelude::*;

type AuscopeApp = aud::apps::auscope::app::App<HostedAudioProducer>;
type AuscopeApp = aud::apps::auscope::app::App<HostAudioInput>;

struct TerminalApp {
app: AuscopeApp,
Expand All @@ -12,7 +12,7 @@ struct TerminalApp {

impl Default for TerminalApp {
fn default() -> Self {
let app = AuscopeApp::with_audio_receiver(HostedAudioProducer::default());
let app = AuscopeApp::with_audio_receiver(HostAudioInput::default());
let mut ui = ui::Ui::default();
ui.update_device_names(app.devices());

Expand Down
121 changes: 73 additions & 48 deletions aud/lib/examples/udp_audio_rx.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use aud_lib::audio::*;
use aud_lib::comms::*;
use crossbeam::channel::{Receiver, Sender};
use std::net::UdpSocket;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread::sleep;
use std::thread::JoinHandle;
use std::time::Duration;

#[derive(Default, Debug)]
pub struct AudioInfo {
Expand All @@ -18,22 +22,49 @@ impl From<AudioBuffer> for AudioInfo {
}
}

fn main() -> anyhow::Result<()> {
setup_logger()?;
struct LoggingAudioConsumer {
buffers: Arc<Mutex<Vec<AudioBuffer>>>,
_handle: JoinHandle<()>,
}

let mut rx = RemoteAudioReceiver::with_address(Sockets {
socket: UdpSocket::bind("127.0.0.1:8080").unwrap(),
target: "127.0.0.1:8081".parse().unwrap(),
})
.unwrap();
impl Default for LoggingAudioConsumer {
fn default() -> Self {
let buffers = Arc::new(Mutex::new(vec![]));
let handle = std::thread::spawn({
let buffers = buffers.clone();

let (sender, receiver) = crossbeam::channel::bounded(100);
run_buffer_count_logger_task(receiver);
wait_for_list_of_devices(&mut rx);
request_audio_device_connection(&mut rx);
wait_for_audio_device_connection(&mut rx);
fetch_audio(&mut rx, sender);
Ok(())
move || loop {
let stats = buffers.try_lock().unwrap().iter().fold(
AudioInfo::default(),
|stats: AudioInfo, buffer: &AudioBuffer| AudioInfo {
num_samples: stats.num_samples + buffer.num_frames() as u32,
num_channels: stats.num_channels + buffer.num_channels as u32,
},
);
buffers.try_lock().unwrap().clear();
log::info!(
"last second : received {} samples, with {} buffers",
stats.num_samples,
stats.num_channels
);
sleep(Duration::from_millis(1_000));
}
});

Self {
_handle: handle,
buffers,
}
}
}

impl AudioConsuming for LoggingAudioConsumer {
fn consume_audio_buffer(&mut self, buffer: AudioBuffer) -> anyhow::Result<()> {
log::info!("{buffer:?}");
let mut buffers = self.buffers.lock().unwrap();
buffers.push(buffer);
Ok(())
}
}

fn setup_logger() -> Result<(), fern::InitError> {
Expand All @@ -52,29 +83,7 @@ fn setup_logger() -> Result<(), fern::InitError> {
Ok(())
}

fn run_buffer_count_logger_task(receiver: Receiver<AudioInfo>) {
std::thread::spawn(move || loop {
let stats = receiver
.try_iter()
.fold(AudioInfo::default(), |acc, info| AudioInfo {
num_channels: acc.num_channels + info.num_channels,
num_samples: acc.num_samples + info.num_samples,
});

log::info!("{stats:?}");
std::thread::sleep(std::time::Duration::from_millis(1_000));
});
}

fn wait_for_list_of_devices(rx: &mut RemoteAudioReceiver) {
while rx.list_audio_devices().is_empty() {
std::thread::sleep(std::time::Duration::from_millis(1_000));
rx.process_audio_events().unwrap();
log::info!("reattempting to get devices");
}
}

fn request_audio_device_connection(rx: &mut RemoteAudioReceiver) {
fn request_audio_device_connection(rx: &mut RemoteAudioReceiver<LoggingAudioConsumer>) {
let devices = rx.list_audio_devices().to_vec();
log::info!("found devices : {devices:#?}");
let channels = AudioChannelSelection::Mono(0);
Expand All @@ -86,19 +95,35 @@ fn request_audio_device_connection(rx: &mut RemoteAudioReceiver) {
);
}

fn wait_for_audio_device_connection(rx: &mut RemoteAudioReceiver) {
while rx.retrieve_audio_buffer().data.is_empty() {
fn main() -> anyhow::Result<()> {
setup_logger()?;

let mut rx = RemoteAudioReceiver::new(
LoggingAudioConsumer::default(),
Sockets {
socket: UdpSocket::bind("127.0.0.1:8080").unwrap(),
target: "127.0.0.1:8081".parse().unwrap(),
},
)
.unwrap();

while rx.list_audio_devices().is_empty() {
sleep(Duration::from_millis(1_000));
rx.process_audio_events().unwrap();
log::info!("reattempting to get devices");
}

request_audio_device_connection(&mut rx);

while !rx.is_accessible() {
rx.process_audio_events().unwrap();
std::thread::sleep(std::time::Duration::from_millis(100));
sleep(Duration::from_millis(100));
}
}

fn fetch_audio(rx: &mut RemoteAudioReceiver, sender: Sender<AudioInfo>) {
loop {
while rx.is_accessible() {
rx.process_audio_events().unwrap();
let audio = rx.retrieve_audio_buffer();
if !audio.data.is_empty() {
sender.send(audio.into()).unwrap();
}
sleep(Duration::from_millis(10));
}

Ok(())
}
16 changes: 7 additions & 9 deletions aud/lib/examples/udp_audio_tx.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use aud_lib::audio::*;
use aud_lib::comms::*;
use std::net::UdpSocket;
use std::thread::sleep;
use std::time::Duration;

fn setup_logger() -> Result<(), fern::InitError> {
fern::Dispatch::new()
Expand Down Expand Up @@ -28,24 +30,20 @@ fn main() -> anyhow::Result<()> {

log::info!("socket opened");

let mut tx = RemoteAudioTransmitter::new(sockets, HostedAudioProducer::default()).unwrap();
let mut tx = RemoteAudioTransmitter::new(HostAudioInput::default(), sockets).unwrap();

while !tx.is_audio_connected() {
if let Err(e) = tx.process_requests() {
while !tx.is_accessible() {
if let Err(e) = tx.process_audio_events() {
log::error!("failed to process requests : {e}");
}

std::thread::sleep(std::time::Duration::from_millis(100));
sleep(Duration::from_millis(100));
}

log::info!("connected to audio device");
tx.purge_audio_cache();

while tx.is_audio_connected() {
while tx.is_accessible() {
tx.process_audio_events().unwrap();
if let Err(e) = tx.try_send_audio() {
log::error!("failed to send audio : {e}");
}
}

Ok(())
Expand Down
7 changes: 5 additions & 2 deletions aud/lib/src/apps/auscope/app.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use crate::audio::*;

pub struct App<AudioReceiver: AudioProviding> {
pub struct App<AudioReceiver> {
audio_buffer: AudioBuffer,
audio_receiver: AudioReceiver,
audio_device: Option<AudioDevice>,
}

impl<AudioReceiver: AudioProviding> App<AudioReceiver> {
impl<AudioReceiver> App<AudioReceiver>
where
AudioReceiver: AudioProviding + AudioInterface,
{
pub fn with_audio_receiver(audio_receiver: AudioReceiver) -> Self {
Self {
audio_buffer: AudioBuffer::default(),
Expand Down
45 changes: 28 additions & 17 deletions aud/lib/src/audio/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
slice,
};

#[repr(C)]
pub struct FfiAudioReceiver {
sender: Sender<AudioBuffer>,
receiver: Receiver<AudioBuffer>,
Expand All @@ -33,7 +34,7 @@ impl Default for FfiAudioReceiver {
}
}

impl AudioProviding for FfiAudioReceiver {
impl AudioInterface for FfiAudioReceiver {
fn is_accessible(&self) -> bool {
self.selected_source.is_some()
}
Expand All @@ -47,7 +48,7 @@ impl AudioProviding for FfiAudioReceiver {
audio_device: &AudioDevice,
channel_selection: AudioChannelSelection,
) -> anyhow::Result<()> {
if channel_selection.is_valid_for_device(audio_device) {
if !audio_device.supports_channels(&channel_selection) {
log::error!("Invalid selection : {channel_selection:#?} for : {audio_device:#?}");
return Ok(());
}
Expand All @@ -62,10 +63,6 @@ impl AudioProviding for FfiAudioReceiver {
Ok(())
}

fn retrieve_audio_buffer(&mut self) -> AudioBuffer {
std::mem::take(&mut self.audio)
}

fn process_audio_events(&mut self) -> anyhow::Result<()> {
match self.receiver.try_recv() {
Ok(mut audio) => {
Expand All @@ -80,6 +77,12 @@ impl AudioProviding for FfiAudioReceiver {
}
}

impl AudioProviding for FfiAudioReceiver {
fn retrieve_audio_buffer(&mut self) -> AudioBuffer {
std::mem::take(&mut self.audio)
}
}

#[repr(C)]
pub struct aud_audio_device_t {
name: *const c_char,
Expand All @@ -94,7 +97,7 @@ pub struct aud_audio_device_t {
/// stream to any `aud` audio
/// consumer
#[no_mangle]
pub extern "C" fn aud_audio_stream_create() -> *mut c_void {
pub extern "C" fn aud_audio_provider_create() -> *mut c_void {
let audio = Box::<FfiAudioReceiver>::default();
Box::into_raw(audio) as *mut _
}
Expand All @@ -104,7 +107,7 @@ pub extern "C" fn aud_audio_stream_create() -> *mut c_void {
/// Not thread-safe, needs to be called from the same
/// thread that calls `create()`
#[no_mangle]
pub unsafe extern "C" fn aud_audio_stream_set_sources(
pub unsafe extern "C" fn aud_audio_provider_set_sources(
ctx: *mut c_void,
sources: *const aud_audio_device_t,
num_sources: c_uint,
Expand Down Expand Up @@ -135,11 +138,11 @@ pub unsafe extern "C" fn aud_audio_stream_set_sources(
///
/// The caller must supply the name of this source
#[no_mangle]
pub unsafe extern "C" fn aud_audio_stream_push(
pub unsafe extern "C" fn aud_audio_provider_push(
ctx: *mut c_void,
source_name: *mut c_char,
interleaved_buffer: *const f32,
num_samples: c_uint,
num_frames: c_uint,
num_channels: c_uint,
) {
if ctx.is_null() {
Expand Down Expand Up @@ -167,10 +170,10 @@ pub unsafe extern "C" fn aud_audio_stream_push(
return;
}

let data = slice::from_raw_parts(interleaved_buffer, (num_channels * num_samples) as usize);
let data = slice::from_raw_parts(interleaved_buffer, (num_channels * num_frames) as usize);
let num_requested_channels = receiver.selected_channels.len() as u32;
let mut write_chan = 0;
let mut buffer = AudioBuffer::new(num_samples, num_requested_channels);
let mut buffer = AudioBuffer::with_frames(num_frames, num_requested_channels);
for (chan, frame) in data.chunks(num_channels as usize).enumerate() {
if !receiver.selected_channels.contains(&chan) {
continue;
Expand All @@ -188,12 +191,11 @@ pub unsafe extern "C" fn aud_audio_stream_push(
}
}

/// Clean up the Audio Stream
/// instance. Ensure the validity
/// of the pointer, it must have
/// been create by a `create`
/// Destroy the instance for clean up.
///
/// Ensure the validity of the pointer, it must have been create by a `create`
#[no_mangle]
pub extern "C" fn aud_audio_stream_destroy(ctx: *mut c_void) {
pub extern "C" fn aud_audio_provider_destroy(ctx: *mut c_void) {
if ctx.is_null() {
return;
}
Expand All @@ -202,3 +204,12 @@ pub extern "C" fn aud_audio_stream_destroy(ctx: *mut c_void) {
let _sender: Box<FfiAudioReceiver> = Box::from_raw(ctx as *mut _);
}
}

// #[no_mangle]
// pub extern "C" fn aud_audio_consumer_create() -> *mut c_void {}
//
// #[no_mangle]
// pub extern "C" fn aud_audio_consumer_consume() -> FfiAudioBuffer {}
//
// #[no_mangle]
// pub extern "C" fn aud_audio_consumer_destroy() {}
Loading

0 comments on commit f101aba

Please sign in to comment.