Skip to content

Commit

Permalink
feat: add is_external_relay_redirect to media sessions (#236)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramiroaisen authored Dec 25, 2023
2 parents c3730c3 + 6e37154 commit 4f632b3
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 8 deletions.
1 change: 1 addition & 0 deletions defs/db/StreamConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export type StreamConnection = {
created_at: DateTime;
country_code: CountryCode | null;
ip: string;
is_external_relay_redirect: boolean;
request: Request;
last_transfer_at: DateTime;
closed_at: DateTime | null;
Expand Down
1 change: 1 addition & 0 deletions defs/db/StreamConnectionLite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ export type StreamConnectionLite = {
do: string | null;
os: string | null;
ca: DateTime;
re: boolean;
cl: DateTime | null;
};
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
};
}
$: console.log({ merged_now_playing, on_air });
let store: ReturnType<typeof get_now_playing_store> | null;
let unsub: (() => void) | null = null;
Expand Down
1 change: 1 addition & 0 deletions rs/bin/openstream/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ async fn start_async(Start { config }: Start) -> Result<(), anyhow::Error> {
let ws_stats = WsStatsServer::new(
deployment.id.clone(),
ws_stats_config.addrs.clone(),
drop_tracer.clone(),
shutdown.clone(),
);
let fut = ws_stats.start()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ async fn create_random_stream_connection(
country_code: request.country_code,
transfer_bytes,
duration_ms,
is_external_relay_redirect: false,
created_at: created_at.into(),
last_transfer_at: created_at.into(),
closed_at,
Expand Down
11 changes: 10 additions & 1 deletion rs/packages/api/src/ws_stats/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod routes;

use drop_tracer::DropTracer;
use futures::stream::FuturesUnordered;
use futures::TryStreamExt;
use hyper::Server;
Expand All @@ -14,6 +15,7 @@ use std::net::SocketAddr;
pub struct WsStatsServer {
deployment_id: String,
addrs: Vec<SocketAddr>,
drop_tracer: DropTracer,
shutdown: Shutdown,
}

Expand All @@ -31,10 +33,16 @@ pub struct Status {
}

impl WsStatsServer {
pub fn new(deployment_id: String, addrs: Vec<SocketAddr>, shutdown: Shutdown) -> Self {
pub fn new(
deployment_id: String,
addrs: Vec<SocketAddr>,
drop_tracer: DropTracer,
shutdown: Shutdown,
) -> Self {
Self {
deployment_id,
addrs,
drop_tracer,
shutdown,
}
}
Expand All @@ -49,6 +57,7 @@ impl WsStatsServer {

app.at("/").nest(routes::router(
self.deployment_id.clone(),
self.drop_tracer.clone(),
self.shutdown.clone(),
));

Expand Down
5 changes: 5 additions & 0 deletions rs/packages/api/src/ws_stats/routes/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use db::{station::Station, ws_stats_connection::WsStatsConnection, Model};
use drop_tracer::DropTracer;
use futures_util::{sink::SinkExt, stream::StreamExt};
use hyper::{Body, StatusCode};
use mongodb::bson::doc;
Expand All @@ -15,6 +16,7 @@ use ts_rs::TS;
#[derive(Debug, Clone)]
pub struct WsConnectionHandler {
pub deployment_id: String,
pub drop_tracer: DropTracer,
pub shutdown: Shutdown,
}

Expand Down Expand Up @@ -124,6 +126,7 @@ impl WsConnectionHandler {

let (res, stream_future) = prex::ws::upgrade(&mut req, None)?;

let token = self.drop_tracer.token();
tokio::spawn(async move {
let mut stream = match stream_future.await {
Ok(stream) => stream,
Expand Down Expand Up @@ -283,6 +286,8 @@ impl WsConnectionHandler {
"CLOSE ws-stats connection {connection_id} for station {station_id} ({reconnections}) in {duration}",
duration=FormatDuration(duration_ms),
);

drop(token)
});

Ok(res)
Expand Down
8 changes: 5 additions & 3 deletions rs/packages/api/src/ws_stats/routes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
pub mod connection;
use drop_tracer::DropTracer;
use prex::router::builder::Builder;
use shutdown::Shutdown;

pub fn router(deployment_id: String, shutdown: Shutdown) -> Builder {
pub fn router(deployment_id: String, drop_tracer: DropTracer, shutdown: Shutdown) -> Builder {
let mut router = prex::prex();

router.get(
"/ws/stats/connection",
connection::WsConnectionHandler {
deployment_id: deployment_id.clone(),
shutdown: shutdown.clone(),
deployment_id,
drop_tracer,
shutdown,
},
);

Expand Down
13 changes: 13 additions & 0 deletions rs/packages/db/src/models/stream_connection/lite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ use ts_rs::TS;

crate::register!(StreamConnectionLite);

#[allow(clippy::bool_comparison)]
#[inline(always)]
fn is_false(v: &bool) -> bool {
*v == false
}

#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[ts(export, export_to = "../../../defs/db/")]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -51,6 +57,11 @@ pub struct StreamConnectionLite {
#[serde(rename = "ca")]
pub created_at: DateTime,

#[serde(rename = "re")]
#[serde(default)]
#[serde(skip_serializing_if = "is_false")]
pub is_external_relay_redirect: bool,

#[serde(rename = "cl")]
pub closed_at: Option<DateTime>,
}
Expand Down Expand Up @@ -80,6 +91,7 @@ impl StreamConnectionLite {
domain: Self::get_domain(full),
duration_ms: full.duration_ms,
transfer_bytes: full.transfer_bytes,
is_external_relay_redirect: full.is_external_relay_redirect,
created_at: full.created_at,
closed_at: full.closed_at,
}
Expand All @@ -99,6 +111,7 @@ impl From<StreamConnection> for StreamConnectionLite {
duration_ms: full.duration_ms,
transfer_bytes: full.transfer_bytes,
country_code: full.country_code,
is_external_relay_redirect: full.is_external_relay_redirect,
created_at: full.created_at,
closed_at: full.closed_at,
}
Expand Down
3 changes: 3 additions & 0 deletions rs/packages/db/src/models/stream_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub struct StreamConnection {
#[serde(with = "serde_util::ip")]
pub ip: IpAddr,

#[serde(default)]
pub is_external_relay_redirect: bool,

pub request: Request,
pub last_transfer_at: DateTime,
pub closed_at: Option<DateTime>,
Expand Down
44 changes: 43 additions & 1 deletion rs/packages/stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,48 @@ impl StreamHandler {
};

// TODO: check account limits
let rx = media_sessions.subscribe(&station.id).await?;
let rx = match media_sessions.subscribe(&station.id).await {
Ok(rx) => rx,
Err(e) => {
if matches!(e, SubscribeError::ExternalRelayRedirect(_)) {
let token = drop_tracer.token();
tokio::spawn(async move {
// create redirect session
let now = DateTime::now();
let doc = {
let request = db::http::Request::from_http(&req);

StreamConnection {
id: StreamConnection::uid(),
station_id: station.id.clone(),
deployment_id: deployment_id.clone(),
is_open: true,
ip: request.real_ip,
country_code: request.country_code,
transfer_bytes: Some(0),
duration_ms: Some(0),
request,
is_external_relay_redirect: true,
created_at: now,
last_transfer_at: now,
closed_at: Some(now),
}
};

let doc_lite = StreamConnectionLite::from_stream_connection_ref(&doc);

let insert_doc = StreamConnection::insert(doc);
let insert_doc_lite = StreamConnectionLite::insert(doc_lite);

let (_, _) = tokio::join!(insert_doc, insert_doc_lite);

drop(token);
});
}

return Err(e.into());
}
};
let content_type = rx.content_type().to_string();

let is_mp3 = content_type == "audio/mpeg";
Expand All @@ -398,6 +439,7 @@ impl StreamHandler {
duration_ms: None,
request,
created_at: now,
is_external_relay_redirect: false,
last_transfer_at: now,
closed_at: None,
}
Expand Down

0 comments on commit 4f632b3

Please sign in to comment.