diff --git a/rs/packages/db/src/lib.rs b/rs/packages/db/src/lib.rs index 8639c3d9..3da1dd9e 100644 --- a/rs/packages/db/src/lib.rs +++ b/rs/packages/db/src/lib.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use error::CheckCollectionError; use futures_util::{Future, TryStreamExt}; use log::*; +use mongodb::bson::Bson; use mongodb::error::Result as MongoResult; use mongodb::options::{ CountOptions, FindOptions, Hint, ReplaceOptions, SelectionCriteria, SessionOptions, @@ -19,6 +20,7 @@ use schemars::JsonSchema; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_util::DateTime; use std::borrow::Borrow; +use std::collections::HashSet; use ts_rs::TS; pub mod error; @@ -126,6 +128,41 @@ pub trait Model: Sized + Unpin + Send + Sync + Serialize + DeserializeOwned { Self::cl_as() } + async fn distinct_string>( + key: &str, + filter: impl Into> + Send, + ) -> MongoResult { + let items = Self::cl().distinct(key, filter, None).await?; + let set = items + .into_iter() + .filter_map(|s| match s { + Bson::String(s) => Some(s), + _ => None, + }) + .collect(); + + Ok(set) + } + + async fn distinct_string_with_session>( + key: &str, + filter: impl Into> + Send, + session: &mut ClientSession, + ) -> MongoResult { + let items = Self::cl() + .distinct_with_session(key, filter, None, session) + .await?; + let set = items + .into_iter() + .filter_map(|s| match s { + Bson::String(s) => Some(s), + _ => None, + }) + .collect(); + + Ok(set) + } + fn indexes() -> Vec { vec![] } diff --git a/rs/packages/db/src/models/deployment/mod.rs b/rs/packages/db/src/models/deployment/mod.rs index 8c82e534..99d860e2 100644 --- a/rs/packages/db/src/models/deployment/mod.rs +++ b/rs/packages/db/src/models/deployment/mod.rs @@ -9,7 +9,7 @@ use futures_util::TryStreamExt; use mongodb::{bson::doc, IndexModel}; use serde::{Deserialize, Serialize}; use serde_util::DateTime; -use std::net::IpAddr; +use std::{collections::HashSet, net::IpAddr}; use ts_rs::TS; crate::register!(Deployment); @@ -93,19 +93,71 @@ impl Model for Deployment { } pub async fn check_now() -> Result<(), mongodb::error::Error> { - let limit: DateTime = (time::OffsetDateTime::now_utc() + let active_time_limit: DateTime = (time::OffsetDateTime::now_utc() - time::Duration::seconds(constants::DEPLOYMENT_HEALTH_CHECK_SHUTDOWN_DELAY_SECS as i64)) .into(); - let filter = doc! { + let active_filter = doc! { "$and": [ { Deployment::KEY_STATE: { "$ne": DeploymentState::KEY_ENUM_VARIANT_CLOSED } }, { Deployment::KEY_HEALTH_CHECKED_AT: { "$ne": null } }, - { Deployment::KEY_HEALTH_CHECKED_AT: { "$lt": limit } }, - ], + { Deployment::KEY_HEALTH_CHECKED_AT: { "$gte": active_time_limit } }, + ] }; - let mut r = Deployment::cl().find(filter, None).await?; + let active_deployment_ids: Vec = + Deployment::distinct_string(crate::KEY_ID, active_filter).await?; + + let mut to_close_deployment_ids = HashSet::::new(); + + // StreamConnection + { + let filter = doc! { + StreamConnection::KEY_IS_OPEN: true, + StreamConnection::KEY_DEPLOYMENT_ID: { "$nin": &active_deployment_ids }, + }; + + let ids: Vec = + StreamConnection::distinct_string(StreamConnection::KEY_DEPLOYMENT_ID, filter).await?; + + to_close_deployment_ids.extend(ids); + }; + + // StreamConnectionLite + { + let filter = doc! { + StreamConnectionLite::KEY_IS_OPEN: true, + StreamConnectionLite::KEY_DEPLOYMENT_ID: { "$nin": &active_deployment_ids }, + }; + + let ids: Vec = + StreamConnectionLite::distinct_string(StreamConnection::KEY_DEPLOYMENT_ID, filter).await?; + + to_close_deployment_ids.extend(ids); + } + + // WsStatsConnection + { + let filter = doc! { + WsStatsConnection::KEY_IS_OPEN: true, + WsStatsConnection::KEY_CURRENT_DEPLOYMENT_ID: { "$nin": &active_deployment_ids }, + }; + + let ids: Vec = + WsStatsConnection::distinct_string(WsStatsConnection::KEY_CURRENT_DEPLOYMENT_ID, filter) + .await?; + + to_close_deployment_ids.extend(ids); + } + + if to_close_deployment_ids.is_empty() { + return Ok(()); + } + + let deployment_ids: Vec = to_close_deployment_ids.into_iter().collect(); + let deployment_filter = doc! { Deployment::KEY_ID: { "$in": deployment_ids } }; + + let mut r = Deployment::cl().find(deployment_filter, None).await?; while let Some(deployment) = r.try_next().await? { log::info!(