From 6e37154f3ccb593cb0b5e686be5d6f87cb109b7b Mon Sep 17 00:00:00 2001 From: ramiroaisen <52116153+ramiroaisen@users.noreply.github.com> Date: Mon, 25 Dec 2023 19:10:59 -0300 Subject: [PATCH] feat: add is_external_relay_redirect to media sessions --- defs/db/StreamConnection.ts | 1 + defs/db/StreamConnectionLite.ts | 1 + .../[account]/account-station-item.svelte | 3 -- rs/bin/openstream/src/main.rs | 1 + .../src/main.rs | 1 + rs/packages/api/src/ws_stats/mod.rs | 11 ++++- .../api/src/ws_stats/routes/connection.rs | 5 +++ rs/packages/api/src/ws_stats/routes/mod.rs | 8 ++-- .../src/models/stream_connection/lite/mod.rs | 13 ++++++ .../db/src/models/stream_connection/mod.rs | 3 ++ rs/packages/stream/src/lib.rs | 44 ++++++++++++++++++- 11 files changed, 83 insertions(+), 8 deletions(-) diff --git a/defs/db/StreamConnection.ts b/defs/db/StreamConnection.ts index 227d17cc..23c76a73 100644 --- a/defs/db/StreamConnection.ts +++ b/defs/db/StreamConnection.ts @@ -13,6 +13,7 @@ export type StreamConnection = { created_at: DateTime; country_code: CountryCode | null; ip: string; + is_external_relay_redirect: boolean; request: Request; last_transfer_at: DateTime; closed_at: DateTime | null; diff --git a/defs/db/StreamConnectionLite.ts b/defs/db/StreamConnectionLite.ts index f0076f0d..7bdfadcf 100644 --- a/defs/db/StreamConnectionLite.ts +++ b/defs/db/StreamConnectionLite.ts @@ -14,5 +14,6 @@ export type StreamConnectionLite = { do: string | null; os: string | null; ca: DateTime; + re: boolean; cl: DateTime | null; }; diff --git a/front/app/src/routes/(root)/(online)/(app)/accounts/[account]/account-station-item.svelte b/front/app/src/routes/(root)/(online)/(app)/accounts/[account]/account-station-item.svelte index 9f41b704..89abc171 100644 --- a/front/app/src/routes/(root)/(online)/(app)/accounts/[account]/account-station-item.svelte +++ b/front/app/src/routes/(root)/(online)/(app)/accounts/[account]/account-station-item.svelte @@ -29,9 +29,6 @@ }; } - $: console.log({ merged_now_playing, on_air }); - - let store: ReturnType | null; let unsub: (() => void) | null = null; diff --git a/rs/bin/openstream/src/main.rs b/rs/bin/openstream/src/main.rs index 5e38f0f2..bb89563c 100644 --- a/rs/bin/openstream/src/main.rs +++ b/rs/bin/openstream/src/main.rs @@ -541,6 +541,7 @@ async fn start_async(Start { config }: Start) -> Result<(), anyhow::Error> { let ws_stats = WsStatsServer::new( deployment.id.clone(), ws_stats_config.addrs.clone(), + drop_tracer.clone(), shutdown.clone(), ); let fut = ws_stats.start()?; diff --git a/rs/internal-scripts/create-random-stream-connections/src/main.rs b/rs/internal-scripts/create-random-stream-connections/src/main.rs index fa05bab7..8df2ee1a 100644 --- a/rs/internal-scripts/create-random-stream-connections/src/main.rs +++ b/rs/internal-scripts/create-random-stream-connections/src/main.rs @@ -178,6 +178,7 @@ async fn create_random_stream_connection( country_code: request.country_code, transfer_bytes, duration_ms, + is_external_relay_redirect: false, created_at: created_at.into(), last_transfer_at: created_at.into(), closed_at, diff --git a/rs/packages/api/src/ws_stats/mod.rs b/rs/packages/api/src/ws_stats/mod.rs index e5857707..a854fe4c 100644 --- a/rs/packages/api/src/ws_stats/mod.rs +++ b/rs/packages/api/src/ws_stats/mod.rs @@ -1,5 +1,6 @@ pub mod routes; +use drop_tracer::DropTracer; use futures::stream::FuturesUnordered; use futures::TryStreamExt; use hyper::Server; @@ -14,6 +15,7 @@ use std::net::SocketAddr; pub struct WsStatsServer { deployment_id: String, addrs: Vec, + drop_tracer: DropTracer, shutdown: Shutdown, } @@ -31,10 +33,16 @@ pub struct Status { } impl WsStatsServer { - pub fn new(deployment_id: String, addrs: Vec, shutdown: Shutdown) -> Self { + pub fn new( + deployment_id: String, + addrs: Vec, + drop_tracer: DropTracer, + shutdown: Shutdown, + ) -> Self { Self { deployment_id, addrs, + drop_tracer, shutdown, } } @@ -49,6 +57,7 @@ impl WsStatsServer { app.at("/").nest(routes::router( self.deployment_id.clone(), + self.drop_tracer.clone(), self.shutdown.clone(), )); diff --git a/rs/packages/api/src/ws_stats/routes/connection.rs b/rs/packages/api/src/ws_stats/routes/connection.rs index 9df3f185..e3edd81f 100644 --- a/rs/packages/api/src/ws_stats/routes/connection.rs +++ b/rs/packages/api/src/ws_stats/routes/connection.rs @@ -1,4 +1,5 @@ use db::{station::Station, ws_stats_connection::WsStatsConnection, Model}; +use drop_tracer::DropTracer; use futures_util::{sink::SinkExt, stream::StreamExt}; use hyper::{Body, StatusCode}; use mongodb::bson::doc; @@ -15,6 +16,7 @@ use ts_rs::TS; #[derive(Debug, Clone)] pub struct WsConnectionHandler { pub deployment_id: String, + pub drop_tracer: DropTracer, pub shutdown: Shutdown, } @@ -124,6 +126,7 @@ impl WsConnectionHandler { let (res, stream_future) = prex::ws::upgrade(&mut req, None)?; + let token = self.drop_tracer.token(); tokio::spawn(async move { let mut stream = match stream_future.await { Ok(stream) => stream, @@ -283,6 +286,8 @@ impl WsConnectionHandler { "CLOSE ws-stats connection {connection_id} for station {station_id} ({reconnections}) in {duration}", duration=FormatDuration(duration_ms), ); + + drop(token) }); Ok(res) diff --git a/rs/packages/api/src/ws_stats/routes/mod.rs b/rs/packages/api/src/ws_stats/routes/mod.rs index 5c69fba3..8fadab61 100644 --- a/rs/packages/api/src/ws_stats/routes/mod.rs +++ b/rs/packages/api/src/ws_stats/routes/mod.rs @@ -1,15 +1,17 @@ pub mod connection; +use drop_tracer::DropTracer; use prex::router::builder::Builder; use shutdown::Shutdown; -pub fn router(deployment_id: String, shutdown: Shutdown) -> Builder { +pub fn router(deployment_id: String, drop_tracer: DropTracer, shutdown: Shutdown) -> Builder { let mut router = prex::prex(); router.get( "/ws/stats/connection", connection::WsConnectionHandler { - deployment_id: deployment_id.clone(), - shutdown: shutdown.clone(), + deployment_id, + drop_tracer, + shutdown, }, ); diff --git a/rs/packages/db/src/models/stream_connection/lite/mod.rs b/rs/packages/db/src/models/stream_connection/lite/mod.rs index 9ce27677..c386868c 100644 --- a/rs/packages/db/src/models/stream_connection/lite/mod.rs +++ b/rs/packages/db/src/models/stream_connection/lite/mod.rs @@ -10,6 +10,12 @@ use ts_rs::TS; crate::register!(StreamConnectionLite); +#[allow(clippy::bool_comparison)] +#[inline(always)] +fn is_false(v: &bool) -> bool { + *v == false +} + #[derive(Debug, Clone, Serialize, Deserialize, TS)] #[ts(export, export_to = "../../../defs/db/")] #[serde(rename_all = "snake_case")] @@ -51,6 +57,11 @@ pub struct StreamConnectionLite { #[serde(rename = "ca")] pub created_at: DateTime, + #[serde(rename = "re")] + #[serde(default)] + #[serde(skip_serializing_if = "is_false")] + pub is_external_relay_redirect: bool, + #[serde(rename = "cl")] pub closed_at: Option, } @@ -80,6 +91,7 @@ impl StreamConnectionLite { domain: Self::get_domain(full), duration_ms: full.duration_ms, transfer_bytes: full.transfer_bytes, + is_external_relay_redirect: full.is_external_relay_redirect, created_at: full.created_at, closed_at: full.closed_at, } @@ -99,6 +111,7 @@ impl From for StreamConnectionLite { duration_ms: full.duration_ms, transfer_bytes: full.transfer_bytes, country_code: full.country_code, + is_external_relay_redirect: full.is_external_relay_redirect, created_at: full.created_at, closed_at: full.closed_at, } diff --git a/rs/packages/db/src/models/stream_connection/mod.rs b/rs/packages/db/src/models/stream_connection/mod.rs index 58928fc9..141f7559 100644 --- a/rs/packages/db/src/models/stream_connection/mod.rs +++ b/rs/packages/db/src/models/stream_connection/mod.rs @@ -34,6 +34,9 @@ pub struct StreamConnection { #[serde(with = "serde_util::ip")] pub ip: IpAddr, + #[serde(default)] + pub is_external_relay_redirect: bool, + pub request: Request, pub last_transfer_at: DateTime, pub closed_at: Option, diff --git a/rs/packages/stream/src/lib.rs b/rs/packages/stream/src/lib.rs index ee9bcb81..4ec0c0b1 100644 --- a/rs/packages/stream/src/lib.rs +++ b/rs/packages/stream/src/lib.rs @@ -378,7 +378,48 @@ impl StreamHandler { }; // TODO: check account limits - let rx = media_sessions.subscribe(&station.id).await?; + let rx = match media_sessions.subscribe(&station.id).await { + Ok(rx) => rx, + Err(e) => { + if matches!(e, SubscribeError::ExternalRelayRedirect(_)) { + let token = drop_tracer.token(); + tokio::spawn(async move { + // create redirect session + let now = DateTime::now(); + let doc = { + let request = db::http::Request::from_http(&req); + + StreamConnection { + id: StreamConnection::uid(), + station_id: station.id.clone(), + deployment_id: deployment_id.clone(), + is_open: true, + ip: request.real_ip, + country_code: request.country_code, + transfer_bytes: Some(0), + duration_ms: Some(0), + request, + is_external_relay_redirect: true, + created_at: now, + last_transfer_at: now, + closed_at: Some(now), + } + }; + + let doc_lite = StreamConnectionLite::from_stream_connection_ref(&doc); + + let insert_doc = StreamConnection::insert(doc); + let insert_doc_lite = StreamConnectionLite::insert(doc_lite); + + let (_, _) = tokio::join!(insert_doc, insert_doc_lite); + + drop(token); + }); + } + + return Err(e.into()); + } + }; let content_type = rx.content_type().to_string(); let is_mp3 = content_type == "audio/mpeg"; @@ -398,6 +439,7 @@ impl StreamHandler { duration_ms: None, request, created_at: now, + is_external_relay_redirect: false, last_transfer_at: now, closed_at: None, }