Skip to content

Commit

Permalink
feat: autoclean deployment ws stats connections
Browse files Browse the repository at this point in the history
  • Loading branch information
ramiroaisen committed Mar 11, 2024
1 parent aa7df28 commit 9866d51
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 46 deletions.
1 change: 1 addition & 0 deletions defs/db/WsStatsConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ export type WsStatsConnection = {
re: number;
ca: DateTime;
cl: DateTime | null | undefined;
_m?: boolean;
};
1 change: 1 addition & 0 deletions rs/packages/api/src/ws_stats/routes/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ impl WsConnectionHandler {
reconnections,
created_at,
closed_at: None,
manually_closed: None,
};

match WsStatsConnection::insert(&connection).await {
Expand Down
114 changes: 68 additions & 46 deletions rs/packages/db/src/models/deployment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use crate::{
account::recalculate_used_listeners_quota,
run_transaction,
stream_connection::{lite::StreamConnectionLite, StreamConnection},
IdDocument, Model,
ws_stats_connection::WsStatsConnection,
Model,
};
use futures_util::TryStreamExt;
use mongodb::{bson::doc, IndexModel};
Expand Down Expand Up @@ -108,24 +109,6 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> {
deployment.id,
);

let filter = doc! { StreamConnection::KEY_DEPLOYMENT_ID: &deployment.id };
let mut ids: Vec<String> = vec![];
{
let projection = doc! { crate::KEY_ID: 1 };

let options = mongodb::options::FindOptions::builder()
.projection(projection)
.build();

let mut cursor = StreamConnection::cl_as::<IdDocument>()
.find(filter, options)
.await?;

while let Some(item) = cursor.try_next().await? {
ids.push(item.id);
}
}
// this unwrap is enforced with the mongodb filter
let closed_at = deployment.health_checked_at.unwrap();

{
Expand Down Expand Up @@ -176,25 +159,19 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> {
},
];

let mut matched_count: u64 = 0;

for chunk in ids.chunks(100_000) {
let filter = doc! {
StreamConnectionLite::KEY_ID: { "$in": chunk },
StreamConnectionLite::KEY_IS_OPEN: true,
};

let r = StreamConnectionLite::cl()
.update_many(filter, update.clone(), None)
.await?;
let filter = doc! {
StreamConnectionLite::KEY_DEPLOYMENT_ID: &deployment.id,
StreamConnectionLite::KEY_IS_OPEN: true,
};

matched_count += r.matched_count;
}
let r = StreamConnectionLite::cl()
.update_many(filter, update.clone(), None)
.await?;

log::info!(
target: "deployment-health",
"closed {} stream_connections_lite for deployment {}",
matched_count,
r.matched_count,
deployment.id,
);
};
Expand Down Expand Up @@ -249,25 +226,70 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> {
},
];

let mut matched_count: u64 = 0;
let filter = doc! {
StreamConnection::KEY_DEPLOYMENT_ID: &deployment.id,
StreamConnection::KEY_IS_OPEN: true,
};

for chunk in ids.chunks(100_000) {
let filter = doc! {
StreamConnection::KEY_ID: { "$in": &chunk },
StreamConnection::KEY_IS_OPEN: true,
};
let r = StreamConnection::cl()
.update_many(filter, update.clone(), None)
.await?;

let r = StreamConnection::cl()
.update_many(filter, update.clone(), None)
.await?;
log::info!(
target: "deployment-health",
"closed {} stream_connections for deployment {}",
r.matched_count,
deployment.id,
);
}

matched_count += r.matched_count;
}
{
const KEY_CA: &str = const_str::concat!("$", WsStatsConnection::KEY_CREATED_AT);

let update = vec![doc! {
"$set": {
WsStatsConnection::KEY_IS_OPEN: false,
WsStatsConnection::KEY_MANUALLY_CLOSED: true,
WsStatsConnection::KEY_CLOSED_AT: {
"$max": [
{
"$dateAdd": {
"startDate": KEY_CA,
"unit": "millisecond",
"amount": 1,
}
},
closed_at
]
},
WsStatsConnection::KEY_DURATION_MS: {
"$max": [
1,
{
"$dateDiff": {
"startDate": KEY_CA,
"endDate": closed_at,
"unit": "millisecond",
}
}
]
}
}
}];

let filter = doc! {
WsStatsConnection::KEY_DEPLOYMENT_ID: &deployment.id,
WsStatsConnection::KEY_IS_OPEN: true,
};

let r = WsStatsConnection::cl()
.update_many(filter, update.clone(), None)
.await?;

log::info!(
target: "deployment-health",
"closed {} stream_connections for deployment {}",
matched_count,
"closed {} ws_stats_connections for deployment {}",
r.matched_count,
deployment.id,
);
}
Expand Down
4 changes: 4 additions & 0 deletions rs/packages/db/src/models/ws_stats_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ pub struct WsStatsConnection {
// pub last_transfer_at: DateTime,
#[serde(rename = "cl")]
pub closed_at: Option<DateTime>,

#[serde(rename = "_m")]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub manually_closed: Option<bool>,
}

impl WsStatsConnection {
Expand Down

0 comments on commit 9866d51

Please sign in to comment.