From 9866d51b304991ad5863930a20d32a6e8c121ded Mon Sep 17 00:00:00 2001 From: ramiroaisen <52116153+ramiroaisen@users.noreply.github.com> Date: Mon, 11 Mar 2024 17:52:07 -0300 Subject: [PATCH 1/3] feat: autoclean deployment ws stats connections --- defs/db/WsStatsConnection.ts | 1 + .../api/src/ws_stats/routes/connection.rs | 1 + rs/packages/db/src/models/deployment/mod.rs | 114 +++++++++++------- .../db/src/models/ws_stats_connection/mod.rs | 4 + 4 files changed, 74 insertions(+), 46 deletions(-) diff --git a/defs/db/WsStatsConnection.ts b/defs/db/WsStatsConnection.ts index 7e49470b..5bf50f21 100644 --- a/defs/db/WsStatsConnection.ts +++ b/defs/db/WsStatsConnection.ts @@ -16,4 +16,5 @@ export type WsStatsConnection = { re: number; ca: DateTime; cl: DateTime | null | undefined; + _m?: boolean; }; diff --git a/rs/packages/api/src/ws_stats/routes/connection.rs b/rs/packages/api/src/ws_stats/routes/connection.rs index b4cec761..d674eda7 100644 --- a/rs/packages/api/src/ws_stats/routes/connection.rs +++ b/rs/packages/api/src/ws_stats/routes/connection.rs @@ -188,6 +188,7 @@ impl WsConnectionHandler { reconnections, created_at, closed_at: None, + manually_closed: None, }; match WsStatsConnection::insert(&connection).await { diff --git a/rs/packages/db/src/models/deployment/mod.rs b/rs/packages/db/src/models/deployment/mod.rs index 36c4e440..53825bb9 100644 --- a/rs/packages/db/src/models/deployment/mod.rs +++ b/rs/packages/db/src/models/deployment/mod.rs @@ -2,7 +2,8 @@ use crate::{ account::recalculate_used_listeners_quota, run_transaction, stream_connection::{lite::StreamConnectionLite, StreamConnection}, - IdDocument, Model, + ws_stats_connection::WsStatsConnection, + Model, }; use futures_util::TryStreamExt; use mongodb::{bson::doc, IndexModel}; @@ -108,24 +109,6 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> { deployment.id, ); - let filter = doc! { StreamConnection::KEY_DEPLOYMENT_ID: &deployment.id }; - let mut ids: Vec = vec![]; - { - let projection = doc! { crate::KEY_ID: 1 }; - - let options = mongodb::options::FindOptions::builder() - .projection(projection) - .build(); - - let mut cursor = StreamConnection::cl_as::() - .find(filter, options) - .await?; - - while let Some(item) = cursor.try_next().await? { - ids.push(item.id); - } - } - // this unwrap is enforced with the mongodb filter let closed_at = deployment.health_checked_at.unwrap(); { @@ -176,25 +159,19 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> { }, ]; - let mut matched_count: u64 = 0; - - for chunk in ids.chunks(100_000) { - let filter = doc! { - StreamConnectionLite::KEY_ID: { "$in": chunk }, - StreamConnectionLite::KEY_IS_OPEN: true, - }; - - let r = StreamConnectionLite::cl() - .update_many(filter, update.clone(), None) - .await?; + let filter = doc! { + StreamConnectionLite::KEY_DEPLOYMENT_ID: &deployment.id, + StreamConnectionLite::KEY_IS_OPEN: true, + }; - matched_count += r.matched_count; - } + let r = StreamConnectionLite::cl() + .update_many(filter, update.clone(), None) + .await?; log::info!( target: "deployment-health", "closed {} stream_connections_lite for deployment {}", - matched_count, + r.matched_count, deployment.id, ); }; @@ -249,25 +226,70 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> { }, ]; - let mut matched_count: u64 = 0; + let filter = doc! { + StreamConnection::KEY_DEPLOYMENT_ID: &deployment.id, + StreamConnection::KEY_IS_OPEN: true, + }; - for chunk in ids.chunks(100_000) { - let filter = doc! { - StreamConnection::KEY_ID: { "$in": &chunk }, - StreamConnection::KEY_IS_OPEN: true, - }; + let r = StreamConnection::cl() + .update_many(filter, update.clone(), None) + .await?; - let r = StreamConnection::cl() - .update_many(filter, update.clone(), None) - .await?; + log::info!( + target: "deployment-health", + "closed {} stream_connections for deployment {}", + r.matched_count, + deployment.id, + ); + } - matched_count += r.matched_count; - } + { + const KEY_CA: &str = const_str::concat!("$", WsStatsConnection::KEY_CREATED_AT); + + let update = vec![doc! { + "$set": { + WsStatsConnection::KEY_IS_OPEN: false, + WsStatsConnection::KEY_MANUALLY_CLOSED: true, + WsStatsConnection::KEY_CLOSED_AT: { + "$max": [ + { + "$dateAdd": { + "startDate": KEY_CA, + "unit": "millisecond", + "amount": 1, + } + }, + closed_at + ] + }, + WsStatsConnection::KEY_DURATION_MS: { + "$max": [ + 1, + { + "$dateDiff": { + "startDate": KEY_CA, + "endDate": closed_at, + "unit": "millisecond", + } + } + ] + } + } + }]; + + let filter = doc! { + WsStatsConnection::KEY_DEPLOYMENT_ID: &deployment.id, + WsStatsConnection::KEY_IS_OPEN: true, + }; + + let r = WsStatsConnection::cl() + .update_many(filter, update.clone(), None) + .await?; log::info!( target: "deployment-health", - "closed {} stream_connections for deployment {}", - matched_count, + "closed {} ws_stats_connections for deployment {}", + r.matched_count, deployment.id, ); } diff --git a/rs/packages/db/src/models/ws_stats_connection/mod.rs b/rs/packages/db/src/models/ws_stats_connection/mod.rs index 81ae66a0..15b79fa7 100644 --- a/rs/packages/db/src/models/ws_stats_connection/mod.rs +++ b/rs/packages/db/src/models/ws_stats_connection/mod.rs @@ -60,6 +60,10 @@ pub struct WsStatsConnection { // pub last_transfer_at: DateTime, #[serde(rename = "cl")] pub closed_at: Option, + + #[serde(rename = "_m")] + #[serde(default, skip_serializing_if = "Option::is_none")] + pub manually_closed: Option, } impl WsStatsConnection { From 6514e097db02ac05a39c3f419cbc8eff21104400 Mon Sep 17 00:00:00 2001 From: ramiroaisen <52116153+ramiroaisen@users.noreply.github.com> Date: Mon, 11 Mar 2024 17:57:35 -0300 Subject: [PATCH 2/3] fix: remove unused clone --- rs/packages/db/src/models/deployment/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rs/packages/db/src/models/deployment/mod.rs b/rs/packages/db/src/models/deployment/mod.rs index 53825bb9..c3cc3d36 100644 --- a/rs/packages/db/src/models/deployment/mod.rs +++ b/rs/packages/db/src/models/deployment/mod.rs @@ -165,7 +165,7 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> { }; let r = StreamConnectionLite::cl() - .update_many(filter, update.clone(), None) + .update_many(filter, update, None) .await?; log::info!( @@ -232,7 +232,7 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> { }; let r = StreamConnection::cl() - .update_many(filter, update.clone(), None) + .update_many(filter, update, None) .await?; log::info!( @@ -283,7 +283,7 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> { }; let r = WsStatsConnection::cl() - .update_many(filter, update.clone(), None) + .update_many(filter, update, None) .await?; log::info!( From 5bd7913e3ce049eed544f1e5378d9f26be91aa9e Mon Sep 17 00:00:00 2001 From: ramiroaisen <52116153+ramiroaisen@users.noreply.github.com> Date: Mon, 11 Mar 2024 18:10:19 -0300 Subject: [PATCH 3/3] feat: imrpove abnormally_closed field --- .../stream-connections/GET/Output.schema.json | 2 +- defs/db/Deployment.ts | 1 + defs/db/StreamConnection.ts | 2 +- defs/db/WsStatsConnection.ts | 2 +- rs/bin/openstream/src/main.rs | 1 + .../api/src/ws_stats/routes/connection.rs | 2 +- rs/packages/db/src/models/deployment/mod.rs | 31 ++++++++++++------- .../models/stream_connection/analytics/mod.rs | 16 ---------- .../src/models/stream_connection/lite/mod.rs | 12 +++---- .../db/src/models/stream_connection/mod.rs | 11 +++---- .../db/src/models/ws_stats_connection/mod.rs | 13 ++++---- rs/packages/stream/src/lib.rs | 4 +-- rs/patches/bson/src/tests/modules/oid.rs | 2 +- 13 files changed, 46 insertions(+), 53 deletions(-) diff --git a/defs/api/stream-connections/GET/Output.schema.json b/defs/api/stream-connections/GET/Output.schema.json index e6511173..0fbb4006 100644 --- a/defs/api/stream-connections/GET/Output.schema.json +++ b/defs/api/stream-connections/GET/Output.schema.json @@ -328,7 +328,7 @@ "default": false, "type": "boolean" }, - "_manually_closed": { + "_m": { "type": "boolean" }, "request": { diff --git a/defs/db/Deployment.ts b/defs/db/Deployment.ts index 44f4e452..67d2b5ee 100644 --- a/defs/db/Deployment.ts +++ b/defs/db/Deployment.ts @@ -14,4 +14,5 @@ export type Deployment = { updated_at: DateTime; health_checked_at: DateTime | null | undefined; dropped_at: DateTime | null | undefined; + _m: boolean; }; diff --git a/defs/db/StreamConnection.ts b/defs/db/StreamConnection.ts index e8b37520..2b6313db 100644 --- a/defs/db/StreamConnection.ts +++ b/defs/db/StreamConnection.ts @@ -14,7 +14,7 @@ export type StreamConnection = { country_code: CountryCode | null | undefined; ip: string; is_external_relay_redirect: boolean; - _manually_closed: boolean; + _m: boolean; request: Request; last_transfer_at: DateTime; closed_at: DateTime | null | undefined; diff --git a/defs/db/WsStatsConnection.ts b/defs/db/WsStatsConnection.ts index 5bf50f21..bd7c7a23 100644 --- a/defs/db/WsStatsConnection.ts +++ b/defs/db/WsStatsConnection.ts @@ -16,5 +16,5 @@ export type WsStatsConnection = { re: number; ca: DateTime; cl: DateTime | null | undefined; - _m?: boolean; + _m: boolean; }; diff --git a/rs/bin/openstream/src/main.rs b/rs/bin/openstream/src/main.rs index f52197db..08e2c963 100644 --- a/rs/bin/openstream/src/main.rs +++ b/rs/bin/openstream/src/main.rs @@ -426,6 +426,7 @@ async fn start_async(Start { config }: Start) -> Result<(), anyhow::Error> { updated_at: now, health_checked_at: Some(now), dropped_at: None, + abnormally_closed: false, }; let config::Config { diff --git a/rs/packages/api/src/ws_stats/routes/connection.rs b/rs/packages/api/src/ws_stats/routes/connection.rs index d674eda7..87e1b772 100644 --- a/rs/packages/api/src/ws_stats/routes/connection.rs +++ b/rs/packages/api/src/ws_stats/routes/connection.rs @@ -188,7 +188,7 @@ impl WsConnectionHandler { reconnections, created_at, closed_at: None, - manually_closed: None, + abnormally_closed: false, }; match WsStatsConnection::insert(&connection).await { diff --git a/rs/packages/db/src/models/deployment/mod.rs b/rs/packages/db/src/models/deployment/mod.rs index c3cc3d36..8f83c4ec 100644 --- a/rs/packages/db/src/models/deployment/mod.rs +++ b/rs/packages/db/src/models/deployment/mod.rs @@ -14,6 +14,11 @@ use ts_rs::TS; crate::register!(Deployment); +#[allow(clippy::bool_comparison)] +fn is_false(v: &bool) -> bool { + *v == false +} + #[derive(Debug, Clone, Serialize, Deserialize, TS)] #[ts(export, export_to = "../../../defs/db/")] #[serde(rename_all = "snake_case")] @@ -50,6 +55,10 @@ pub struct Deployment { pub health_checked_at: Option, pub dropped_at: Option, + + #[serde(rename = "_m")] + #[serde(default, skip_serializing_if = "is_false")] + pub abnormally_closed: bool, } #[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize, TS)] @@ -62,10 +71,6 @@ pub enum DeploymentState { Closed, } -impl Deployment { - pub const KEY_MANUALLY_CLOSED: &'static str = "_manually_closed"; -} - impl Model for Deployment { const UID_LEN: usize = 8; const CL_NAME: &'static str = "deployments"; @@ -111,6 +116,7 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> { let closed_at = deployment.health_checked_at.unwrap(); + // StreamConnectionLite { const KEY_CA: &str = const_str::concat!("$", StreamConnectionLite::KEY_CREATED_AT); const KEY_DU: &str = const_str::concat!("$", StreamConnectionLite::KEY_DURATION_MS); @@ -119,7 +125,7 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> { doc! { "$set": { StreamConnectionLite::KEY_IS_OPEN: false, - StreamConnection::KEY_MANUALLY_CLOSED: true, + StreamConnection::KEY_ABNORNALLY_CLOSED: true, StreamConnectionLite::KEY_CLOSED_AT: { "$max": [ { @@ -176,6 +182,7 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> { ); }; + // StreamConnection { const KEY_CREATED_AT: &str = const_str::concat!("$", StreamConnection::KEY_CREATED_AT); const KEY_DURATION_MS: &str = const_str::concat!("$", StreamConnection::KEY_DURATION_MS); @@ -185,7 +192,7 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> { doc! { "$set": { StreamConnection::KEY_IS_OPEN: false, - StreamConnection::KEY_MANUALLY_CLOSED: true, + StreamConnection::KEY_ABNORNALLY_CLOSED: true, StreamConnection::KEY_CLOSED_AT: { "$max": [ { @@ -241,15 +248,16 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> { r.matched_count, deployment.id, ); - } + }; + // WsStatsConnection { const KEY_CA: &str = const_str::concat!("$", WsStatsConnection::KEY_CREATED_AT); let update = vec![doc! { "$set": { WsStatsConnection::KEY_IS_OPEN: false, - WsStatsConnection::KEY_MANUALLY_CLOSED: true, + WsStatsConnection::KEY_ABNORMALLY_CLOSED: true, WsStatsConnection::KEY_CLOSED_AT: { "$max": [ { @@ -292,13 +300,14 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> { r.matched_count, deployment.id, ); - } + }; + // Deployment { let update = doc! { "$set": { Deployment::KEY_STATE: DeploymentState::KEY_ENUM_VARIANT_CLOSED, - Deployment::KEY_MANUALLY_CLOSED: true, + Deployment::KEY_ABNORMALLY_CLOSED: true, Deployment::KEY_DROPPED_AT: closed_at, } }; @@ -324,7 +333,7 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> { target: "deployment-health", "used listeners quota recalculated for all accounts", ); - } + }; } Ok(()) diff --git a/rs/packages/db/src/models/stream_connection/analytics/mod.rs b/rs/packages/db/src/models/stream_connection/analytics/mod.rs index cb61e411..04318a9f 100644 --- a/rs/packages/db/src/models/stream_connection/analytics/mod.rs +++ b/rs/packages/db/src/models/stream_connection/analytics/mod.rs @@ -19,13 +19,9 @@ use crate::{station::Station, stream_connection::lite::StreamConnectionLite, Mod #[derive(Debug, Serialize, Deserialize)] #[macros::keys] pub struct Item { - // #[serde(rename = "_id")] - // pub id: String, #[serde(rename = "st")] pub station_id: String, - //#[serde(rename = "op")] - //pub is_open: bool, #[serde(rename = "ip")] #[serde(with = "serde_util::ip")] pub ip: IpAddr, @@ -52,18 +48,6 @@ pub struct Item { #[serde(rename = "ca")] pub created_at: DateTime, - // #[serde(rename = "re")] - // #[serde(default)] - // #[serde(skip_serializing_if = "is_false")] - // pub is_external_relay_redirect: bool, - - // #[serde(rename = "_m")] - // #[serde(default)] - // #[serde(skip_serializing_if = "is_false")] - // pub manually_closed: bool, - - // #[serde(rename = "cl")] - // pub closed_at: Option, } impl Item { diff --git a/rs/packages/db/src/models/stream_connection/lite/mod.rs b/rs/packages/db/src/models/stream_connection/lite/mod.rs index e54a27c3..039448c7 100644 --- a/rs/packages/db/src/models/stream_connection/lite/mod.rs +++ b/rs/packages/db/src/models/stream_connection/lite/mod.rs @@ -62,14 +62,12 @@ pub struct StreamConnectionLite { pub created_at: DateTime, #[serde(rename = "re")] - #[serde(default)] - #[serde(skip_serializing_if = "is_false")] + #[serde(default, skip_serializing_if = "is_false")] pub is_external_relay_redirect: bool, #[serde(rename = "_m")] - #[serde(default)] - #[serde(skip_serializing_if = "is_false")] - pub manually_closed: bool, + #[serde(default, skip_serializing_if = "is_false")] + pub abnormally_closed: bool, #[serde(rename = "cl")] pub closed_at: Option, @@ -100,7 +98,7 @@ impl StreamConnectionLite { duration_ms: full.duration_ms, transfer_bytes: full.transfer_bytes, is_external_relay_redirect: full.is_external_relay_redirect, - manually_closed: full.manually_closed, + abnormally_closed: full.abnornally_closed, created_at: full.created_at, closed_at: full.closed_at, } @@ -122,7 +120,7 @@ impl From for StreamConnectionLite { transfer_bytes: full.transfer_bytes, country_code: full.country_code, is_external_relay_redirect: full.is_external_relay_redirect, - manually_closed: full.manually_closed, + abnormally_closed: full.abnornally_closed, created_at: full.created_at, closed_at: full.closed_at, } diff --git a/rs/packages/db/src/models/stream_connection/mod.rs b/rs/packages/db/src/models/stream_connection/mod.rs index 6003bad7..562b51b0 100644 --- a/rs/packages/db/src/models/stream_connection/mod.rs +++ b/rs/packages/db/src/models/stream_connection/mod.rs @@ -15,8 +15,8 @@ pub mod lite; pub mod stats; #[allow(clippy::bool_comparison)] -fn is_false(b: &bool) -> bool { - *b == false +fn is_false(v: &bool) -> bool { + *v == false } crate::register!(StreamConnection); @@ -46,10 +46,9 @@ pub struct StreamConnection { #[serde(default)] pub is_external_relay_redirect: bool, - #[serde(default)] - #[serde(skip_serializing_if = "is_false")] - #[serde(rename = "_manually_closed")] - pub manually_closed: bool, + #[serde(rename = "_m")] + #[serde(default, skip_serializing_if = "is_false")] + pub abnornally_closed: bool, pub request: Request, pub last_transfer_at: DateTime, diff --git a/rs/packages/db/src/models/ws_stats_connection/mod.rs b/rs/packages/db/src/models/ws_stats_connection/mod.rs index 15b79fa7..7c7657cf 100644 --- a/rs/packages/db/src/models/ws_stats_connection/mod.rs +++ b/rs/packages/db/src/models/ws_stats_connection/mod.rs @@ -9,6 +9,11 @@ use ts_rs::TS; crate::register!(WsStatsConnection); +#[allow(clippy::bool_comparison)] +fn is_false(v: &bool) -> bool { + *v == false +} + #[derive(Debug, Clone, Serialize, Deserialize, TS)] #[ts(export, export_to = "../../../defs/db/")] #[serde(rename_all = "snake_case")] @@ -62,12 +67,8 @@ pub struct WsStatsConnection { pub closed_at: Option, #[serde(rename = "_m")] - #[serde(default, skip_serializing_if = "Option::is_none")] - pub manually_closed: Option, -} - -impl WsStatsConnection { - pub const KEY_MANNUALLY_CLOSED: &'static str = "_m"; + #[serde(default, skip_serializing_if = "is_false")] + pub abnormally_closed: bool, } impl Model for WsStatsConnection { diff --git a/rs/packages/stream/src/lib.rs b/rs/packages/stream/src/lib.rs index ae2fcd84..1dc32c1e 100644 --- a/rs/packages/stream/src/lib.rs +++ b/rs/packages/stream/src/lib.rs @@ -406,7 +406,7 @@ impl StreamHandler { duration_ms: Some(0), request, is_external_relay_redirect: true, - manually_closed: false, + abnornally_closed: false, created_at: now, last_transfer_at: now, closed_at: Some(now), @@ -447,7 +447,7 @@ impl StreamHandler { request, created_at: now, is_external_relay_redirect: false, - manually_closed: false, + abnornally_closed: false, last_transfer_at: now, closed_at: None, } diff --git a/rs/patches/bson/src/tests/modules/oid.rs b/rs/patches/bson/src/tests/modules/oid.rs index b875eaa2..46f4c46c 100644 --- a/rs/patches/bson/src/tests/modules/oid.rs +++ b/rs/patches/bson/src/tests/modules/oid.rs @@ -27,7 +27,7 @@ fn byte_string_oid() { } #[test] -#[allow(clippy::eq_op)] +#[allow(clippy::bool_comparison)] fn oid_equals() { let _guard = LOCK.run_concurrently(); let oid = ObjectId::new();