Skip to content

Commit

Permalink
feat: add external relay copycodec for aac and mp3 (#253)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramiroaisen authored Jan 17, 2024
2 parents 2accc1e + 873d73e commit 339efe2
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 55 deletions.
38 changes: 38 additions & 0 deletions rs/packages/db/src/models/probe/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::Model;
use mongodb::{bson::doc, options::FindOneOptions, IndexModel};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_util::DateTime;
use ts_rs::TS;

Expand Down Expand Up @@ -37,11 +38,48 @@ impl Probe {
let options = FindOneOptions::builder().sort(sort).build();
Self::cl().find_one(filter, options).await
}

pub fn streams(&self) -> Vec<(Option<String>, Option<usize>)> {
match &self.result {
ProbeResult::Error { .. } => vec![],

ProbeResult::Ok { document } => {
let streams = match document.get("streams") {
Some(streams) => streams,
None => return vec![],
};

match streams {
Value::Array(streams) => streams
.iter()
.map(|stream| {
let codec = stream["codec_name"].as_str().map(ToString::to_string);

let bit_rate = match stream["bit_rate"]
.as_str()
.and_then(|s| s.parse::<usize>().ok())
{
Some(n) => Some(n),
None => stream["tags"]["variant_bitrate"]
.as_str()
.and_then(|s| s.parse::<usize>().ok()),
};

(codec, bit_rate)
})
.collect(),

_ => vec![],
}
}
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[serde(tag = "r", rename_all = "snake_case")]
#[ts(export, export_to = "../../../defs/db/")]
#[macros::keys]
pub enum ProbeResult {
Ok {
#[ts(type = "Record<string, any>")]
Expand Down
69 changes: 35 additions & 34 deletions rs/packages/ffmpeg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,23 +209,31 @@ impl Ffmpeg {
Some(input) => cmd.arg(input),
};

if !self.config.headers.is_empty() {
let v = self
.config
.headers
.iter()
.map(|(k, v)| format!("{}:{}", k, v))
.collect::<Vec<_>>()
.join("\r\n");

cmd.arg("-headers");
cmd.arg(v);
}

// copy codec
if self.config.copycodec {
cmd.arg("-c:a");
cmd.arg("copy");
} else {
// bitrate
cmd.arg("-ab");
cmd.arg(format!("{}k", self.config.kbitrate));

cmd.arg("-minrate");
cmd.arg(format!("{}k", self.config.kbitrate));

cmd.arg("-maxrate");
cmd.arg(format!("{}k", self.config.kbitrate));

cmd.arg("-bufsize");
cmd.arg(format!("{}k", self.config.kbitrate));

// channels
cmd.arg("-ac");
cmd.arg(self.config.channels.to_string());

// frequency
cmd.arg("-ar");
cmd.arg(self.config.freq.to_string());
}

// format
Expand All @@ -237,27 +245,6 @@ impl Ffmpeg {
cmd.arg("-vn");
}

// channels
cmd.arg("-ac");
cmd.arg(self.config.channels.to_string());

// frequency
cmd.arg("-ar");
cmd.arg(self.config.freq.to_string());

// bitrate
cmd.arg("-ab");
cmd.arg(format!("{}k", self.config.kbitrate));

cmd.arg("-minrate");
cmd.arg(format!("{}k", self.config.kbitrate));

cmd.arg("-maxrate");
cmd.arg(format!("{}k", self.config.kbitrate));

cmd.arg("-bufsize");
cmd.arg(format!("{}k", self.config.kbitrate));

// threads
cmd.arg("-threads");
cmd.arg(self.config.threads.to_string());
Expand All @@ -266,6 +253,20 @@ impl Ffmpeg {
cmd.arg("-loglevel");
cmd.arg(self.config.loglevel.as_str());

// headers
if !self.config.headers.is_empty() {
let v = self
.config
.headers
.iter()
.map(|(k, v)| format!("{}:{}", k, v))
.collect::<Vec<_>>()
.join("\r\n");

cmd.arg("-headers");
cmd.arg(v);
}

// output
cmd.arg("-");

Expand Down
51 changes: 42 additions & 9 deletions rs/packages/media/src/handle/external_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::{Duration, Instant};
use db::media_session::MediaSession;
use db::{media_session::MediaSessionState, Model};
use drop_tracer::{DropTracer, Token};
use ffmpeg::{Ffmpeg, FfmpegConfig, FfmpegSpawn};
use ffmpeg::{Ffmpeg, FfmpegConfig, FfmpegSpawn, Format};
use futures_util::StreamExt;
use log::*;
use mongodb::bson::doc;
Expand All @@ -17,19 +17,22 @@ use tokio::task::JoinHandle;

use crate::channel::{SendError, Sender};
use crate::handle::util::PrettyDuration;
use crate::ProbeCodec;

use constants::{
EXTERNAL_RELAY_NO_DATA_SHUTDOWN_SECS, EXTERNAL_RELAY_NO_DATA_START_SHUTDOWN_SECS,
EXTERNAL_RELAY_NO_LISTENERS_SHUTDOWN_DELAY_SECS, /*STREAM_BURST_LENGTH,*/
STREAM_BURST_LENGTH, STREAM_CHUNK_SIZE, STREAM_KBITRATE,
};

#[allow(clippy::too_many_arguments)]
pub fn run_external_relay_source(
sender: Sender,
deployment_id: String,
task_id: String,
station_id: String,
url: String,
codec_info: Option<(ProbeCodec, usize)>,
drop_tracer: DropTracer,
shutdown: Shutdown,
) -> JoinHandle<Result<(), ExternalRelayError>> {
Expand Down Expand Up @@ -81,13 +84,43 @@ pub fn run_external_relay_source(
let fut = async move {
let headers = ffmpeg::headers_for_url(&url);

let ffmpeg_config = FfmpegConfig {
input: Some(url),
kbitrate: STREAM_KBITRATE,
readrate: true,
readrate_initial_burst: STREAM_BURST_LENGTH as f64,
headers,
..FfmpegConfig::default()
let (ffmpeg_config, chunk_size) = match codec_info {
Some((codec, bitrate)) => {
let format = match codec {
ProbeCodec::Mp3 => Format::MP3,
ProbeCodec::Aac => Format::AAC,
};

let config = FfmpegConfig {
input: Some(url),
kbitrate: bitrate / 1000,
readrate: true,
readrate_initial_burst: STREAM_BURST_LENGTH as f64,
copycodec: true,
headers,
format,
..FfmpegConfig::default()
};

let chunk_size = STREAM_CHUNK_SIZE;

(config, chunk_size)
}

None => {
let config = FfmpegConfig {
input: Some(url),
kbitrate: STREAM_KBITRATE,
readrate: true,
readrate_initial_burst: STREAM_BURST_LENGTH as f64,
headers,
..FfmpegConfig::default()
};

let chunk_size = STREAM_CHUNK_SIZE;

(config, chunk_size)
}
};

let ff_spawn = match Ffmpeg::new(ffmpeg_config).spawn() {
Expand Down Expand Up @@ -121,7 +154,7 @@ pub fn run_external_relay_source(
use stream_util::*;
use tokio::time::sleep;

let mut chunks = stdout.into_bytes_stream(STREAM_CHUNK_SIZE);
let mut chunks = stdout.into_bytes_stream(chunk_size);

let mut no_listeners_since: Option<Instant> = None;

Expand Down
99 changes: 87 additions & 12 deletions rs/packages/media/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod health;
use constants::MEDIA_LOCK_TIMEOUT_SECS;
use db::{
audio_file::AudioFile,
probe::{Probe, ProbeResult},
run_transaction,
station::{OwnerDeploymentInfo, Station},
Model,
Expand All @@ -26,6 +27,12 @@ use channel::{Receiver, Sender};
use handle::internal_relay::GetInternalRelayError;
use handle::{get_internal_relay_source, run_external_relay_source, run_playlist_source};

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum ProbeCodec {
Mp3,
Aac,
}

#[derive(Debug)]
pub struct Handle {
sender: Sender,
Expand Down Expand Up @@ -270,25 +277,95 @@ impl MediaSessionMap {
match &*lock {
Some(handle) => Ok(handle.sender.subscribe()),
None => {
{
// external relay redirect
let station = Station::get_by_id(station_id).await?;
if let Some(station) = station {
if let Some(url) = station.external_relay_url {
// external relay redirect
let station: Station;
match Station::get_by_id(station_id).await? {
None => return Err(SubscribeError::StationNotFound(station_id.to_string())),
Some(s) => {
station = s;
if let Some(url) = &station.external_relay_url {
if station.external_relay_redirect {
return Err(SubscribeError::ExternalRelayRedirect(url));
return Err(SubscribeError::ExternalRelayRedirect(url.clone()));
}
}
}
}

let probe_result: Option<(ProbeCodec, usize)>;

if station.external_relay_url.is_some() {
let filter = doc! {
ProbeResult::KEY_ENUM_TAG: ProbeResult::KEY_ENUM_VARIANT_OK,
Probe::KEY_STATION_ID: station_id,
};

let sort = doc! {
Probe::KEY_CREATED_AT: -1
};

let options = mongodb::options::FindOneOptions::builder()
.sort(sort)
.build();

let last_probe = Probe::cl().find_one(filter, options).await?;

match last_probe {
None => {
probe_result = None;
}

Some(doc) => {
let mut streams = doc
.streams()
.into_iter()
.filter_map(|(codec, bitrate)| match codec?.as_ref() {
"mp3" => Some((ProbeCodec::Mp3, bitrate)),
"aac" => Some((ProbeCodec::Aac, bitrate)),
_ => None,
})
.collect::<Vec<(ProbeCodec, Option<usize>)>>();

streams.sort_by(|(_, br1), (_, br2)| {
use std::cmp::Ordering;

match (br1, br2) {
(Some(br1), Some(br2)) => br2.cmp(br1),
(Some(_), None) => Ordering::Less,
(None, Some(_)) => Ordering::Greater,
(None, None) => Ordering::Equal,
}
});

let first = streams.first();

match first {
None => probe_result = None,
Some((codec, br)) => {
let bitrate = br.unwrap_or(128_000).max(64_000).min(320_000);
probe_result = Some((*codec, bitrate));
}
}
}
}
} else {
probe_result = None;
}

let task_id = Station::random_owner_task_id();

let map_entry_release =
MapEntryRelease::new(station_id.to_string(), task_id.clone(), self.clone());

let content_type = match probe_result {
None => "audio/mpeg".to_string(),
Some((codec, _)) => match codec {
ProbeCodec::Mp3 => "audio/mpeg".to_string(),
ProbeCodec::Aac => "audio/aac".to_string(),
},
};

let owner_deployment_info = OwnerDeploymentInfo {
content_type: "audio/mpeg".to_string(),
content_type: content_type.clone(),
deployment_id: self.deployment_id.clone(),
task_id: task_id.clone(),
health_checked_at: Some(DateTime::now()),
Expand Down Expand Up @@ -337,11 +414,7 @@ impl MediaSessionMap {
match station.external_relay_url {
// 1) external relay
Some(url) => {
info = Info::new(
Kind::ExternalRelay,
task_id.clone(),
"audio/mpeg".to_string(),
);
info = Info::new(Kind::ExternalRelay, task_id.clone(), content_type);
sender = Sender::new(station_id.to_string(), info);

{
Expand All @@ -351,13 +424,15 @@ impl MediaSessionMap {
let station_id = station_id.to_string();
let drop_tracer = self.drop_tracer.clone();
let shutdown = self.shutdown.clone();
let codec_info = probe_result;
tokio::spawn(async move {
let _ = run_external_relay_source(
sender,
deployment_id,
task_id,
station_id,
url,
codec_info,
drop_tracer,
shutdown,
)
Expand Down

0 comments on commit 339efe2

Please sign in to comment.