Skip to content

Commit

Permalink
feat: improve deployment closing (#293)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramiroaisen authored Mar 11, 2024
2 parents 715b252 + 07c2052 commit f1fb384
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 6 deletions.
37 changes: 37 additions & 0 deletions rs/packages/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use async_trait::async_trait;
use error::CheckCollectionError;
use futures_util::{Future, TryStreamExt};
use log::*;
use mongodb::bson::Bson;
use mongodb::error::Result as MongoResult;
use mongodb::options::{
CountOptions, FindOptions, Hint, ReplaceOptions, SelectionCriteria, SessionOptions,
Expand All @@ -19,6 +20,7 @@ use schemars::JsonSchema;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_util::DateTime;
use std::borrow::Borrow;
use std::collections::HashSet;
use ts_rs::TS;

pub mod error;
Expand Down Expand Up @@ -126,6 +128,41 @@ pub trait Model: Sized + Unpin + Send + Sync + Serialize + DeserializeOwned {
Self::cl_as()
}

async fn distinct_string<T: FromIterator<String>>(
key: &str,
filter: impl Into<Option<Document>> + Send,
) -> MongoResult<T> {
let items = Self::cl().distinct(key, filter, None).await?;
let set = items
.into_iter()
.filter_map(|s| match s {
Bson::String(s) => Some(s),
_ => None,
})
.collect();

Ok(set)
}

async fn distinct_string_with_session<T: FromIterator<String>>(
key: &str,
filter: impl Into<Option<Document>> + Send,
session: &mut ClientSession,
) -> MongoResult<T> {
let items = Self::cl()
.distinct_with_session(key, filter, None, session)
.await?;
let set = items
.into_iter()
.filter_map(|s| match s {
Bson::String(s) => Some(s),
_ => None,
})
.collect();

Ok(set)
}

fn indexes() -> Vec<IndexModel> {
vec![]
}
Expand Down
64 changes: 58 additions & 6 deletions rs/packages/db/src/models/deployment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures_util::TryStreamExt;
use mongodb::{bson::doc, IndexModel};
use serde::{Deserialize, Serialize};
use serde_util::DateTime;
use std::net::IpAddr;
use std::{collections::HashSet, net::IpAddr};
use ts_rs::TS;

crate::register!(Deployment);
Expand Down Expand Up @@ -93,19 +93,71 @@ impl Model for Deployment {
}

pub async fn check_now() -> Result<(), mongodb::error::Error> {
let limit: DateTime = (time::OffsetDateTime::now_utc()
let active_time_limit: DateTime = (time::OffsetDateTime::now_utc()
- time::Duration::seconds(constants::DEPLOYMENT_HEALTH_CHECK_SHUTDOWN_DELAY_SECS as i64))
.into();

let filter = doc! {
let active_filter = doc! {
"$and": [
{ Deployment::KEY_STATE: { "$ne": DeploymentState::KEY_ENUM_VARIANT_CLOSED } },
{ Deployment::KEY_HEALTH_CHECKED_AT: { "$ne": null } },
{ Deployment::KEY_HEALTH_CHECKED_AT: { "$lt": limit } },
],
{ Deployment::KEY_HEALTH_CHECKED_AT: { "$gte": active_time_limit } },
]
};

let mut r = Deployment::cl().find(filter, None).await?;
let active_deployment_ids: Vec<String> =
Deployment::distinct_string(crate::KEY_ID, active_filter).await?;

let mut to_close_deployment_ids = HashSet::<String>::new();

// StreamConnection
{
let filter = doc! {
StreamConnection::KEY_IS_OPEN: true,
StreamConnection::KEY_DEPLOYMENT_ID: { "$nin": &active_deployment_ids },
};

let ids: Vec<String> =
StreamConnection::distinct_string(StreamConnection::KEY_DEPLOYMENT_ID, filter).await?;

to_close_deployment_ids.extend(ids);
};

// StreamConnectionLite
{
let filter = doc! {
StreamConnectionLite::KEY_IS_OPEN: true,
StreamConnectionLite::KEY_DEPLOYMENT_ID: { "$nin": &active_deployment_ids },
};

let ids: Vec<String> =
StreamConnectionLite::distinct_string(StreamConnection::KEY_DEPLOYMENT_ID, filter).await?;

to_close_deployment_ids.extend(ids);
}

// WsStatsConnection
{
let filter = doc! {
WsStatsConnection::KEY_IS_OPEN: true,
WsStatsConnection::KEY_CURRENT_DEPLOYMENT_ID: { "$nin": &active_deployment_ids },
};

let ids: Vec<String> =
WsStatsConnection::distinct_string(WsStatsConnection::KEY_CURRENT_DEPLOYMENT_ID, filter)
.await?;

to_close_deployment_ids.extend(ids);
}

if to_close_deployment_ids.is_empty() {
return Ok(());
}

let deployment_ids: Vec<String> = to_close_deployment_ids.into_iter().collect();
let deployment_filter = doc! { Deployment::KEY_ID: { "$in": deployment_ids } };

let mut r = Deployment::cl().find(deployment_filter, None).await?;

while let Some(deployment) = r.try_next().await? {
log::info!(
Expand Down

0 comments on commit f1fb384

Please sign in to comment.