diff --git a/defs/db/Station.ts b/defs/db/Station.ts index 2b4f0698..21ee9d24 100644 --- a/defs/db/Station.ts +++ b/defs/db/Station.ts @@ -36,11 +36,11 @@ export type Station = { app_store_url: string | null; user_metadata: Metadata; system_metadata: Metadata; - owner_deployment_info: OwnerDeploymentInfo | null; - playlist_is_randomly_shuffled: boolean; external_relay_url: string | null; source_password: string; + owner_deployment_info: OwnerDeploymentInfo | null; last_external_relay_probe_started_at: DateTime | null; + playlist_is_randomly_shuffled: boolean; created_at: DateTime; updated_at: DateTime; deleted_at: DateTime | null; diff --git a/rs/packages/api/src/ws_stats/routes/connection.rs b/rs/packages/api/src/ws_stats/routes/connection.rs index 8c1ff451..3c4a08f6 100644 --- a/rs/packages/api/src/ws_stats/routes/connection.rs +++ b/rs/packages/api/src/ws_stats/routes/connection.rs @@ -1,4 +1,4 @@ -use db::{ws_stats_connection::WsStatsConnection, Model}; +use db::{station::Station, ws_stats_connection::WsStatsConnection, Model}; use futures_util::{sink::SinkExt, stream::StreamExt}; use hyper::{Body, StatusCode}; use mongodb::bson::doc; @@ -20,6 +20,12 @@ pub struct WsConnectionHandler { #[derive(Debug, thiserror::Error)] pub enum WsConnectionHandlerError { + #[error("db: {0}")] + Db(#[from] mongodb::error::Error), + + #[error("station with id {0} not found")] + StationNotFound(String), + #[error("expecting websocket request")] NotWs, @@ -32,14 +38,19 @@ pub enum WsConnectionHandlerError { impl From for Response { fn from(err: WsConnectionHandlerError) -> Self { - let body = Body::from(format!("{}", err)); - - let status = match err { + let status = match &err { + WsConnectionHandlerError::Db(_) => StatusCode::INTERNAL_SERVER_ERROR, + WsConnectionHandlerError::StationNotFound(_) => StatusCode::BAD_REQUEST, WsConnectionHandlerError::NotWs => StatusCode::BAD_REQUEST, WsConnectionHandlerError::InvalidQs(_) => StatusCode::BAD_REQUEST, WsConnectionHandlerError::ProtocolError(_) => StatusCode::BAD_REQUEST, }; + let body = match &err { + WsConnectionHandlerError::Db(_) => Body::from("internal server error (db)"), + _ => Body::from(format!("{}", err)), + }; + let mut res = Response::new(status); *res.body_mut() = body; @@ -107,6 +118,10 @@ impl WsConnectionHandler { let ip = req.isomorphic_ip(); let country_code = geoip::ip_to_country_code(&ip); + if !Station::exists(qs.station_id.clone()).await? { + return Err(WsConnectionHandlerError::StationNotFound(qs.station_id)); + } + let (res, stream_future) = prex::ws::upgrade(&mut req, None)?; tokio::spawn(async move { @@ -125,6 +140,7 @@ impl WsConnectionHandler { app_version, } = qs; + let reconnections: u16; let connection_id: String; let created_at: DateTime; @@ -132,6 +148,7 @@ impl WsConnectionHandler { () => {{ connection_id = WsStatsConnection::uid(); created_at = DateTime::now(); + reconnections = 0; let connection = WsStatsConnection { id: connection_id.clone(), @@ -143,7 +160,7 @@ impl WsConnectionHandler { ip, app_kind: app_kind.clone(), app_version, - reconnections: 0, + reconnections, created_at, closed_at: None, }; @@ -182,6 +199,7 @@ impl WsConnectionHandler { Ok(None) => create!(), Ok(Some(connection)) => { + reconnections = connection.reconnections; connection_id = connection.id; created_at = connection.created_at; } @@ -189,22 +207,31 @@ impl WsConnectionHandler { } } - 'start: { - let start_message = serde_json::to_string(&ServerEvent::Start { - connection_id: connection_id.clone(), - }) - .unwrap(); - - let r = tokio::select! { - _ = shutdown.signal() => break 'start, - r = stream.send(Message::text(start_message)) => r - }; + log::info!( + target: "ws-stats", + "OPEN ws-stats connection {connection_id} for station {station_id} ({reconnections})" + ); - match r { - Ok(_) => {} - _ => break 'start, + 'start: { + macro_rules! send { + ($event:expr) => {{ + let event = serde_json::to_string(&$event).unwrap(); + let r = tokio::select! { + _ = shutdown.signal() => break 'start, + r = stream.send(Message::text(event)) => r + }; + + match r { + Ok(_) => {} + _ => break 'start, + } + }}; } + send!(ServerEvent::Start { + connection_id: connection_id.clone(), + }); + 'messages: loop { let msg = tokio::select! { _ = shutdown.signal() => { @@ -229,16 +256,7 @@ impl WsConnectionHandler { match event { ClientEvent::Pong => {} ClientEvent::Ping => { - let pong = serde_json::to_string(&ServerEvent::Pong).unwrap(); - let r = tokio::select! { - _ = shutdown.signal() => break 'messages, - r = stream.send(Message::text(pong)) => r - }; - - match r { - Ok(_) => {} - _ => break 'messages, - } + send!(ServerEvent::Pong); } } } @@ -259,6 +277,11 @@ impl WsConnectionHandler { }; let _ = WsStatsConnection::update_by_id(&connection_id, update).await; + + log::info!( + target: "ws-stats", + "CLOSE ws-stats connection {connection_id} for station {station_id} ({reconnections})" + ); }); Ok(res) @@ -268,6 +291,23 @@ impl WsConnectionHandler { #[async_trait::async_trait] impl Handler for WsConnectionHandler { async fn call(&self, req: Request, _: Next) -> Response { - self.handle(req).await.into() + let r = self.handle(req).await; + match &r { + Ok(_) => { + log::info!( + target: "ws-stats", + "ws connections handle ok" + ) + } + + Err(err) => { + log::warn!( + target: "ws-stats", + "ws connections handle err: {}", + err + ) + } + } + r.into() } } diff --git a/rs/packages/db/src/models/station/mod.rs b/rs/packages/db/src/models/station/mod.rs index faff623b..5a63c0b1 100644 --- a/rs/packages/db/src/models/station/mod.rs +++ b/rs/packages/db/src/models/station/mod.rs @@ -254,18 +254,16 @@ pub struct Station { pub user_metadata: Metadata, pub system_metadata: Metadata, - // runtime - pub owner_deployment_info: Option, - - // misc - pub playlist_is_randomly_shuffled: bool, + // external-relay pub external_relay_url: Option, // auth pub source_password: String, - // external relay probe + // runtime + pub owner_deployment_info: Option, pub last_external_relay_probe_started_at: Option, + pub playlist_is_randomly_shuffled: bool, pub created_at: DateTime, pub updated_at: DateTime,