diff --git a/defs/api/stream-connections/GET/Output.schema.json b/defs/api/stream-connections/GET/Output.schema.json index 8e886e47..e6511173 100644 --- a/defs/api/stream-connections/GET/Output.schema.json +++ b/defs/api/stream-connections/GET/Output.schema.json @@ -328,6 +328,9 @@ "default": false, "type": "boolean" }, + "_manually_closed": { + "type": "boolean" + }, "request": { "type": "object", "required": [ diff --git a/defs/db/StreamConnection.ts b/defs/db/StreamConnection.ts index 6e6a0064..ad07a696 100644 --- a/defs/db/StreamConnection.ts +++ b/defs/db/StreamConnection.ts @@ -14,6 +14,7 @@ export type StreamConnection = { country_code: CountryCode | null; ip: string; is_external_relay_redirect: boolean; + _manually_closed: boolean; request: Request; last_transfer_at: DateTime; closed_at: DateTime | null; diff --git a/defs/db/StreamConnectionLite.ts b/defs/db/StreamConnectionLite.ts index 72990592..42d929a7 100644 --- a/defs/db/StreamConnectionLite.ts +++ b/defs/db/StreamConnectionLite.ts @@ -15,5 +15,6 @@ export type StreamConnectionLite = { os: string | null; ca: DateTime; re: boolean; + _m: boolean; cl: DateTime | null; }; diff --git a/rs/packages/db/src/models/stream_connection/analytics/mod.rs b/rs/packages/db/src/models/stream_connection/analytics/mod.rs index 4c2aa72e..cb61e411 100644 --- a/rs/packages/db/src/models/stream_connection/analytics/mod.rs +++ b/rs/packages/db/src/models/stream_connection/analytics/mod.rs @@ -16,6 +16,93 @@ use ts_rs::TS; use crate::{station::Station, stream_connection::lite::StreamConnectionLite, Model}; +#[derive(Debug, Serialize, Deserialize)] +#[macros::keys] +pub struct Item { + // #[serde(rename = "_id")] + // pub id: String, + #[serde(rename = "st")] + pub station_id: String, + + //#[serde(rename = "op")] + //pub is_open: bool, + #[serde(rename = "ip")] + #[serde(with = "serde_util::ip")] + pub ip: IpAddr, + + #[serde(rename = "cc")] + pub country_code: Option, + + #[serde(rename = "du")] + #[serde(with = "serde_util::as_f64::option")] + pub duration_ms: Option, + + #[serde(rename = "by")] + #[serde(with = "serde_util::as_f64::option")] + pub transfer_bytes: Option, + + #[serde(rename = "br")] + pub browser: Option, + + #[serde(rename = "do")] + pub domain: Option, + + #[serde(rename = "os")] + pub os: Option, + + #[serde(rename = "ca")] + pub created_at: DateTime, + // #[serde(rename = "re")] + // #[serde(default)] + // #[serde(skip_serializing_if = "is_false")] + // pub is_external_relay_redirect: bool, + + // #[serde(rename = "_m")] + // #[serde(default)] + // #[serde(skip_serializing_if = "is_false")] + // pub manually_closed: bool, + + // #[serde(rename = "cl")] + // pub closed_at: Option, +} + +impl Item { + fn projection() -> mongodb::bson::Document { + doc! { + crate::KEY_ID: 0, + Item::KEY_STATION_ID: 1, + Item::KEY_IP: 1, + Item::KEY_COUNTRY_CODE: 1, + Item::KEY_DURATION_MS: 1, + Item::KEY_TRANSFER_BYTES: 1, + Item::KEY_BROWSER: 1, + Item::KEY_DOMAIN: 1, + Item::KEY_OS: 1, + Item::KEY_CREATED_AT: 1, + } + } +} + +#[cfg(test)] +#[test] +fn stream_connection_analytics_item_keys() { + assert_eq!(StreamConnectionLite::KEY_STATION_ID, Item::KEY_STATION_ID); + assert_eq!(StreamConnectionLite::KEY_IP, Item::KEY_IP); + assert_eq!( + StreamConnectionLite::KEY_COUNTRY_CODE, + Item::KEY_COUNTRY_CODE + ); + assert_eq!(StreamConnectionLite::KEY_DURATION_MS, Item::KEY_DURATION_MS); + assert_eq!( + StreamConnectionLite::KEY_TRANSFER_BYTES, + Item::KEY_TRANSFER_BYTES + ); + assert_eq!(StreamConnectionLite::KEY_BROWSER, Item::KEY_BROWSER); + assert_eq!(StreamConnectionLite::KEY_DOMAIN, Item::KEY_DOMAIN); + assert_eq!(StreamConnectionLite::KEY_OS, Item::KEY_OS); + assert_eq!(StreamConnectionLite::KEY_CREATED_AT, Item::KEY_CREATED_AT); +} + #[derive(Debug, Clone, Serialize, Deserialize, TS, JsonSchema)] #[ts(export, export_to = "../../../defs/analytics/")] pub struct Analytics { @@ -273,7 +360,7 @@ impl Batch { } #[inline(always)] - pub fn add(&mut self, conn: StreamConnectionLite) { + pub fn add(&mut self, conn: Item) { let created_at = conn.created_at.to_offset(self.offset); let conn_duration_ms = conn @@ -303,8 +390,9 @@ impl Batch { #[cfg(feature = "analytics-max-concurrent")] let stop = StartStopEvent::new(start_s + (conn_duration_ms / 1000) as u32, false); - if !conn.is_open { - #[cfg(feature = "analytics-max-concurrent")] + // duration_ms is None if the connection is still open + #[cfg(feature = "analytics-max-concurrent")] + if conn.duration_ms.is_some() { self.start_stop_events.push(stop); } @@ -320,7 +408,7 @@ impl Batch { item.start_stop_events.push(start); #[cfg(feature = "analytics-max-concurrent")] - if !conn.is_open { + if conn.duration_ms.is_some() { item.start_stop_events.push(stop); } }; @@ -625,10 +713,15 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result(); - let options = mongodb::options::FindOptions::builder().sort(sort).build(); + let options = mongodb::options::FindOptions::builder() + .sort(sort) + .projection(Item::projection()) + .build(); let filter = doc! { "$and": and }; - let mut cursor = StreamConnectionLite::cl().find(filter, options).await?; + let mut cursor = StreamConnectionLite::cl_as::() + .find(filter, options) + .await?; let mut batch = Batch::new(offset_date.offset()); diff --git a/rs/packages/db/src/models/stream_connection/app_analytics/mod.rs b/rs/packages/db/src/models/stream_connection/app_analytics/mod.rs index ec1ebaa0..ae560573 100644 --- a/rs/packages/db/src/models/stream_connection/app_analytics/mod.rs +++ b/rs/packages/db/src/models/stream_connection/app_analytics/mod.rs @@ -16,6 +16,78 @@ use ts_rs::TS; use crate::{station::Station, ws_stats_connection::WsStatsConnection, Model}; +#[derive(Debug, Serialize, Deserialize)] +#[macros::keys] +pub struct Item { + // #[serde(rename = "_id")] + // pub id: String, + #[serde(rename = "st")] + pub station_id: String, + + // #[serde(rename = "dp")] + // pub deployment_id: String, + + // #[serde(with = "serde_util::as_f64::option")] + // pub transfer_bytes: Option, + #[serde(rename = "du")] + #[serde(with = "serde_util::as_f64::option")] + pub duration_ms: Option, + + // #[serde(rename = "op")] + // pub is_open: bool, + #[serde(rename = "cc")] + pub country_code: Option, + + #[serde(rename = "ip")] + #[serde(with = "serde_util::ip")] + pub ip: IpAddr, + + #[serde(rename = "ap")] + pub app_kind: Option, + + #[serde(rename = "av")] + #[serde(with = "serde_util::as_f64::option")] + pub app_version: Option, + + // #[serde(rename = "re")] + // #[serde(with = "serde_util::as_f64")] + // pub reconnections: u16, + #[serde(rename = "ca")] + pub created_at: DateTime, + // pub request: Request, + // pub last_transfer_at: DateTime, + + // #[serde(rename = "cl")] + // pub closed_at: Option, +} + +impl Item { + pub fn projection() -> mongodb::bson::Document { + doc! { + crate::KEY_ID: 0, + WsStatsConnection::KEY_STATION_ID: 1, + WsStatsConnection::KEY_DURATION_MS: 1, + WsStatsConnection::KEY_COUNTRY_CODE: 1, + WsStatsConnection::KEY_IP: 1, + WsStatsConnection::KEY_APP_KIND: 1, + WsStatsConnection::KEY_APP_VERSION: 1, + WsStatsConnection::KEY_CREATED_AT: 1, + } + } +} + +#[cfg(test)] +#[test] +fn ws_stat_item_keys() { + assert_eq!(Item::KEY_STATION_ID, WsStatsConnection::KEY_STATION_ID); + assert_eq!(Item::KEY_DURATION_MS, WsStatsConnection::KEY_DURATION_MS); + assert_eq!(Item::KEY_COUNTRY_CODE, WsStatsConnection::KEY_COUNTRY_CODE); + assert_eq!(Item::KEY_IP, WsStatsConnection::KEY_IP); + assert_eq!(Item::KEY_APP_KIND, WsStatsConnection::KEY_APP_KIND); + assert_eq!(Item::KEY_APP_VERSION, WsStatsConnection::KEY_APP_VERSION); + assert_eq!(Item::KEY_CREATED_AT, WsStatsConnection::KEY_CREATED_AT); +} + #[derive(Debug, Clone, Serialize, Deserialize, TS, JsonSchema)] #[ts(export, export_to = "../../../defs/app-analytics/")] pub struct Analytics { @@ -288,7 +360,7 @@ impl Batch { } #[inline(always)] - pub fn add(&mut self, conn: WsStatsConnection) { + pub fn add(&mut self, conn: Item) { let created_at = conn.created_at.to_offset(self.offset); let conn_duration_ms = conn @@ -322,7 +394,7 @@ impl Batch { #[cfg(feature = "analytics-max-concurrent")] let stop = StartStopEvent::new(start_s + (conn_duration_ms / 1000) as u32, false); - if !conn.is_open { + if conn.duration_ms.is_some() { #[cfg(feature = "analytics-max-concurrent")] self.start_stop_events.push(stop); } @@ -339,7 +411,7 @@ impl Batch { item.start_stop_events.push(start); #[cfg(feature = "analytics-max-concurrent")] - if !conn.is_open { + if conn.duration_ms.is_some() { item.start_stop_events.push(stop); } }; @@ -657,10 +729,15 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result(); - let options = mongodb::options::FindOptions::builder().sort(sort).build(); + let options = mongodb::options::FindOptions::builder() + .sort(sort) + .projection(Item::projection()) + .build(); let filter = doc! { "$and": and }; - let mut cursor = WsStatsConnection::cl().find(filter, options).await?; + let mut cursor = WsStatsConnection::cl_as::() + .find(filter, options) + .await?; let mut batch = Batch::new(offset_date.offset()); diff --git a/rs/packages/db/src/models/stream_connection/lite/mod.rs b/rs/packages/db/src/models/stream_connection/lite/mod.rs index c386868c..a2da9841 100644 --- a/rs/packages/db/src/models/stream_connection/lite/mod.rs +++ b/rs/packages/db/src/models/stream_connection/lite/mod.rs @@ -62,13 +62,16 @@ pub struct StreamConnectionLite { #[serde(skip_serializing_if = "is_false")] pub is_external_relay_redirect: bool, + #[serde(rename = "_m")] + #[serde(default)] + #[serde(skip_serializing_if = "is_false")] + pub manually_closed: bool, + #[serde(rename = "cl")] pub closed_at: Option, } impl StreamConnectionLite { - pub const KET_MANUALLY_CLOSED: &'static str = "_m"; - pub fn get_domain(full: &StreamConnection) -> Option { match full.request.headers.get("referer") { None => None, @@ -92,6 +95,7 @@ impl StreamConnectionLite { duration_ms: full.duration_ms, transfer_bytes: full.transfer_bytes, is_external_relay_redirect: full.is_external_relay_redirect, + manually_closed: full.manually_closed, created_at: full.created_at, closed_at: full.closed_at, } @@ -112,6 +116,7 @@ impl From for StreamConnectionLite { transfer_bytes: full.transfer_bytes, country_code: full.country_code, is_external_relay_redirect: full.is_external_relay_redirect, + manually_closed: full.manually_closed, created_at: full.created_at, closed_at: full.closed_at, } diff --git a/rs/packages/db/src/models/stream_connection/mod.rs b/rs/packages/db/src/models/stream_connection/mod.rs index 9e96d135..a1792836 100644 --- a/rs/packages/db/src/models/stream_connection/mod.rs +++ b/rs/packages/db/src/models/stream_connection/mod.rs @@ -14,6 +14,11 @@ pub mod index; pub mod lite; pub mod stats; +#[allow(clippy::bool_comparison)] +fn is_false(b: &bool) -> bool { + *b == false +} + crate::register!(StreamConnection); #[derive(Debug, Clone, Serialize, Deserialize, TS, JsonSchema)] @@ -41,15 +46,16 @@ pub struct StreamConnection { #[serde(default)] pub is_external_relay_redirect: bool, + #[serde(default)] + #[serde(skip_serializing_if = "is_false")] + #[serde(rename = "_manually_closed")] + pub manually_closed: bool, + pub request: Request, pub last_transfer_at: DateTime, pub closed_at: Option, } -impl StreamConnection { - pub const KEY_MANUALLY_CLOSED: &'static str = "_manually_closed"; -} - impl Model for StreamConnection { const CL_NAME: &'static str = "stream_connections"; const UID_LEN: usize = 12; diff --git a/rs/packages/stream/src/lib.rs b/rs/packages/stream/src/lib.rs index ae4c1d29..ae2fcd84 100644 --- a/rs/packages/stream/src/lib.rs +++ b/rs/packages/stream/src/lib.rs @@ -406,6 +406,7 @@ impl StreamHandler { duration_ms: Some(0), request, is_external_relay_redirect: true, + manually_closed: false, created_at: now, last_transfer_at: now, closed_at: Some(now), @@ -446,6 +447,7 @@ impl StreamHandler { request, created_at: now, is_external_relay_redirect: false, + manually_closed: false, last_transfer_at: now, closed_at: None, }