Skip to content

Commit

Permalink
test and fix interleaved host audio recv/send
Browse files Browse the repository at this point in the history
  • Loading branch information
Nico Chatzi committed Sep 23, 2023
1 parent 2132a9b commit 4637a2c
Show file tree
Hide file tree
Showing 22 changed files with 1,285 additions and 780 deletions.
4 changes: 4 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ test:
dev CMD='just b':
cargo watch -cs 'reset; {{CMD}}' -i 'res/*' -i 'out/*' -i 'lua/api/examples/*'

# run the benchmarks
bench:
cargo bench --features bench

# install the apps in a directory
install DIR='./out': build
#!/usr/bin/env bash
Expand Down
6 changes: 3 additions & 3 deletions aud/cli/src/cmd/auscope/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
mod ui;

use aud::audio::HostedAudioProducer;
use aud::audio::HostAudioInput;
use ratatui::prelude::*;

type AuscopeApp = aud::apps::auscope::app::App<HostedAudioProducer>;
type AuscopeApp = aud::apps::auscope::app::App<HostAudioInput>;

struct TerminalApp {
app: AuscopeApp,
Expand All @@ -12,7 +12,7 @@ struct TerminalApp {

impl Default for TerminalApp {
fn default() -> Self {
let app = AuscopeApp::with_audio_receiver(HostedAudioProducer::default());
let app = AuscopeApp::with_audio_receiver(HostAudioInput::default());
let mut ui = ui::Ui::default();
ui.update_device_names(app.devices());

Expand Down
1 change: 1 addition & 0 deletions aud/lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ default = ["ffi", "udp"]
ffi = []
udp = ["serde", "bincode"]
vendored_lua = ["mlua/vendored"]
bench = []

[dependencies]
anyhow = { workspace = true }
Expand Down
53 changes: 53 additions & 0 deletions aud/lib/benches/host_audio_io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#[cfg(feature = "bench")]
mod bench {
use aud_lib::audio::*;
use criterion::{criterion_group, criterion_main, Criterion};
use crossbeam::channel::unbounded;
use rand::random;
use std::iter::repeat_with;

fn bench_enqueue(c: &mut Criterion) {
for buffer_size in [128, 256, 512, 1024] {
let mut group = c.benchmark_group(format!("Enqueue Mono Buffer Size {}", buffer_size));
group.bench_function("Mono", |b| {
let (sender, _receiver) = unbounded();
let input_channels = 1;
let selected_channels = vec![0];
let enqueue_fn = test_make_audio_buffer_enqueueing_function(
sender,
input_channels,
selected_channels,
);
let audio_buffer: Vec<f32> = repeat_with(|| random::<f32>())
.take(buffer_size * input_channels)
.collect();
b.iter(|| {
enqueue_fn(&audio_buffer);
});
});
group.finish();
}
}

fn bench_dequeue(c: &mut Criterion) {
for buffer_size in [128, 256, 512, 1024] {
let mut group =
c.benchmark_group(format!("Dequeue Stereo Buffer Size {}", buffer_size));
group.bench_function("Stereo", |b| {
let (_sender, receiver) = unbounded();
let total_channels = 2;
let selected_channels = vec![0, 1];
let dequeue_fn =
test_make_audio_dequeing_function(receiver, total_channels, selected_channels);
let mut audio_buffer: Vec<f32> = vec![0.0; buffer_size * total_channels];
b.iter(|| {
dequeue_fn(&mut audio_buffer);
});
});
group.finish();
}
}

criterion_group!(host_audio_io, bench_enqueue, bench_dequeue);
criterion_main!(host_audio_io);
}
121 changes: 73 additions & 48 deletions aud/lib/examples/udp_audio_rx.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use aud_lib::audio::*;
use aud_lib::comms::*;
use crossbeam::channel::{Receiver, Sender};
use std::net::UdpSocket;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread::sleep;
use std::thread::JoinHandle;
use std::time::Duration;

#[derive(Default, Debug)]
pub struct AudioInfo {
Expand All @@ -18,22 +22,49 @@ impl From<AudioBuffer> for AudioInfo {
}
}

fn main() -> anyhow::Result<()> {
setup_logger()?;
struct LoggingAudioConsumer {
buffers: Arc<Mutex<Vec<AudioBuffer>>>,
_handle: JoinHandle<()>,
}

let mut rx = RemoteAudioReceiver::with_address(Sockets {
socket: UdpSocket::bind("127.0.0.1:8080").unwrap(),
target: "127.0.0.1:8081".parse().unwrap(),
})
.unwrap();
impl Default for LoggingAudioConsumer {
fn default() -> Self {
let buffers = Arc::new(Mutex::new(vec![]));
let handle = std::thread::spawn({
let buffers = buffers.clone();

let (sender, receiver) = crossbeam::channel::bounded(100);
run_buffer_count_logger_task(receiver);
wait_for_list_of_devices(&mut rx);
request_audio_device_connection(&mut rx);
wait_for_audio_device_connection(&mut rx);
fetch_audio(&mut rx, sender);
Ok(())
move || loop {
let stats = buffers.try_lock().unwrap().iter().fold(
AudioInfo::default(),
|stats: AudioInfo, buffer: &AudioBuffer| AudioInfo {
num_samples: stats.num_samples + buffer.num_frames() as u32,
num_channels: stats.num_channels + buffer.num_channels as u32,
},
);
buffers.try_lock().unwrap().clear();
log::info!(
"last second : received {} samples, with {} buffers",
stats.num_samples,
stats.num_channels
);
sleep(Duration::from_millis(1_000));
}
});

Self {
_handle: handle,
buffers,
}
}
}

impl AudioConsuming for LoggingAudioConsumer {
fn consume_audio_buffer(&mut self, buffer: AudioBuffer) -> anyhow::Result<()> {
log::info!("{buffer:?}");
let mut buffers = self.buffers.lock().unwrap();
buffers.push(buffer);
Ok(())
}
}

fn setup_logger() -> Result<(), fern::InitError> {
Expand All @@ -52,29 +83,7 @@ fn setup_logger() -> Result<(), fern::InitError> {
Ok(())
}

fn run_buffer_count_logger_task(receiver: Receiver<AudioInfo>) {
std::thread::spawn(move || loop {
let stats = receiver
.try_iter()
.fold(AudioInfo::default(), |acc, info| AudioInfo {
num_channels: acc.num_channels + info.num_channels,
num_samples: acc.num_samples + info.num_samples,
});

log::info!("{stats:?}");
std::thread::sleep(std::time::Duration::from_millis(1_000));
});
}

fn wait_for_list_of_devices(rx: &mut RemoteAudioReceiver) {
while rx.list_audio_devices().is_empty() {
std::thread::sleep(std::time::Duration::from_millis(1_000));
rx.process_audio_events().unwrap();
log::info!("reattempting to get devices");
}
}

fn request_audio_device_connection(rx: &mut RemoteAudioReceiver) {
fn request_audio_device_connection(rx: &mut RemoteAudioReceiver<LoggingAudioConsumer>) {
let devices = rx.list_audio_devices().to_vec();
log::info!("found devices : {devices:#?}");
let channels = AudioChannelSelection::Mono(0);
Expand All @@ -86,19 +95,35 @@ fn request_audio_device_connection(rx: &mut RemoteAudioReceiver) {
);
}

fn wait_for_audio_device_connection(rx: &mut RemoteAudioReceiver) {
while rx.retrieve_audio_buffer().data.is_empty() {
fn main() -> anyhow::Result<()> {
setup_logger()?;

let mut rx = RemoteAudioReceiver::new(
LoggingAudioConsumer::default(),
Sockets {
socket: UdpSocket::bind("127.0.0.1:8080").unwrap(),
target: "127.0.0.1:8081".parse().unwrap(),
},
)
.unwrap();

while rx.list_audio_devices().is_empty() {
sleep(Duration::from_millis(1_000));
rx.process_audio_events().unwrap();
log::info!("reattempting to get devices");
}

request_audio_device_connection(&mut rx);

while !rx.is_accessible() {
rx.process_audio_events().unwrap();
std::thread::sleep(std::time::Duration::from_millis(100));
sleep(Duration::from_millis(100));
}
}

fn fetch_audio(rx: &mut RemoteAudioReceiver, sender: Sender<AudioInfo>) {
loop {
while rx.is_accessible() {
rx.process_audio_events().unwrap();
let audio = rx.retrieve_audio_buffer();
if !audio.data.is_empty() {
sender.send(audio.into()).unwrap();
}
sleep(Duration::from_millis(10));
}

Ok(())
}
16 changes: 7 additions & 9 deletions aud/lib/examples/udp_audio_tx.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use aud_lib::audio::*;
use aud_lib::comms::*;
use std::net::UdpSocket;
use std::thread::sleep;
use std::time::Duration;

fn setup_logger() -> Result<(), fern::InitError> {
fern::Dispatch::new()
Expand Down Expand Up @@ -28,24 +30,20 @@ fn main() -> anyhow::Result<()> {

log::info!("socket opened");

let mut tx = RemoteAudioTransmitter::new(sockets, HostedAudioProducer::default()).unwrap();
let mut tx = RemoteAudioTransmitter::new(HostAudioInput::default(), sockets).unwrap();

while !tx.is_audio_connected() {
if let Err(e) = tx.process_requests() {
while !tx.is_accessible() {
if let Err(e) = tx.process_audio_events() {
log::error!("failed to process requests : {e}");
}

std::thread::sleep(std::time::Duration::from_millis(100));
sleep(Duration::from_millis(100));
}

log::info!("connected to audio device");
tx.purge_audio_cache();

while tx.is_audio_connected() {
while tx.is_accessible() {
tx.process_audio_events().unwrap();
if let Err(e) = tx.try_send_audio() {
log::error!("failed to send audio : {e}");
}
}

Ok(())
Expand Down
27 changes: 13 additions & 14 deletions aud/lib/include/aud.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ typedef struct aud_audio_device_t {
* stream to any `aud` audio
* consumer
*/
void *aud_audio_stream_create(void);
void *aud_audio_provider_create(void);

/**
* # Safety
*
* Not thread-safe, needs to be called from the same
* thread that calls `create()`
*/
void aud_audio_stream_set_sources(void *ctx,
const struct aud_audio_device_t *sources,
unsigned int num_sources);
void aud_audio_provider_set_sources(void *ctx,
const struct aud_audio_device_t *sources,
unsigned int num_sources);

/**
* # Safety
Expand All @@ -36,18 +36,17 @@ void aud_audio_stream_set_sources(void *ctx,
*
* The caller must supply the name of this source
*/
void aud_audio_stream_push(void *ctx,
char *source_name,
const float *interleaved_buffer,
unsigned int num_samples,
unsigned int num_channels);
void aud_audio_provider_push(void *ctx,
char *source_name,
const float *interleaved_buffer,
unsigned int num_frames,
unsigned int num_channels);

/**
* Clean up the Audio Stream
* instance. Ensure the validity
* of the pointer, it must have
* been create by a `create`
* Destroy the instance for clean up.
*
* Ensure the validity of the pointer, it must have been create by a `create`
*/
void aud_audio_stream_destroy(void *ctx);
void aud_audio_provider_destroy(void *ctx);

#endif /* AUD_LIB_BINDINGS */
7 changes: 5 additions & 2 deletions aud/lib/src/apps/auscope/app.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use crate::audio::*;

pub struct App<AudioReceiver: AudioProviding> {
pub struct App<AudioReceiver> {
audio_buffer: AudioBuffer,
audio_receiver: AudioReceiver,
audio_device: Option<AudioDevice>,
}

impl<AudioReceiver: AudioProviding> App<AudioReceiver> {
impl<AudioReceiver> App<AudioReceiver>
where
AudioReceiver: AudioProviding + AudioInterface,
{
pub fn with_audio_receiver(audio_receiver: AudioReceiver) -> Self {
Self {
audio_buffer: AudioBuffer::default(),
Expand Down
Loading

0 comments on commit 4637a2c

Please sign in to comment.