Skip to content

Commit

Permalink
feat: autoclean deployment ws stats connections (#291)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramiroaisen authored Mar 11, 2024
2 parents aa7df28 + 5bd7913 commit fe3dce0
Show file tree
Hide file tree
Showing 13 changed files with 113 additions and 92 deletions.
2 changes: 1 addition & 1 deletion defs/api/stream-connections/GET/Output.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@
"default": false,
"type": "boolean"
},
"_manually_closed": {
"_m": {
"type": "boolean"
},
"request": {
Expand Down
1 change: 1 addition & 0 deletions defs/db/Deployment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ export type Deployment = {
updated_at: DateTime;
health_checked_at: DateTime | null | undefined;
dropped_at: DateTime | null | undefined;
_m: boolean;
};
2 changes: 1 addition & 1 deletion defs/db/StreamConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export type StreamConnection = {
country_code: CountryCode | null | undefined;
ip: string;
is_external_relay_redirect: boolean;
_manually_closed: boolean;
_m: boolean;
request: Request;
last_transfer_at: DateTime;
closed_at: DateTime | null | undefined;
Expand Down
1 change: 1 addition & 0 deletions defs/db/WsStatsConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ export type WsStatsConnection = {
re: number;
ca: DateTime;
cl: DateTime | null | undefined;
_m: boolean;
};
1 change: 1 addition & 0 deletions rs/bin/openstream/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ async fn start_async(Start { config }: Start) -> Result<(), anyhow::Error> {
updated_at: now,
health_checked_at: Some(now),
dropped_at: None,
abnormally_closed: false,
};

let config::Config {
Expand Down
1 change: 1 addition & 0 deletions rs/packages/api/src/ws_stats/routes/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ impl WsConnectionHandler {
reconnections,
created_at,
closed_at: None,
abnormally_closed: false,
};

match WsStatsConnection::insert(&connection).await {
Expand Down
141 changes: 86 additions & 55 deletions rs/packages/db/src/models/deployment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use crate::{
account::recalculate_used_listeners_quota,
run_transaction,
stream_connection::{lite::StreamConnectionLite, StreamConnection},
IdDocument, Model,
ws_stats_connection::WsStatsConnection,
Model,
};
use futures_util::TryStreamExt;
use mongodb::{bson::doc, IndexModel};
Expand All @@ -13,6 +14,11 @@ use ts_rs::TS;

crate::register!(Deployment);

#[allow(clippy::bool_comparison)]
fn is_false(v: &bool) -> bool {
*v == false
}

#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[ts(export, export_to = "../../../defs/db/")]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -49,6 +55,10 @@ pub struct Deployment {
pub health_checked_at: Option<DateTime>,

pub dropped_at: Option<DateTime>,

#[serde(rename = "_m")]
#[serde(default, skip_serializing_if = "is_false")]
pub abnormally_closed: bool,
}

#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize, TS)]
Expand All @@ -61,10 +71,6 @@ pub enum DeploymentState {
Closed,
}

impl Deployment {
pub const KEY_MANUALLY_CLOSED: &'static str = "_manually_closed";
}

impl Model for Deployment {
const UID_LEN: usize = 8;
const CL_NAME: &'static str = "deployments";
Expand Down Expand Up @@ -108,26 +114,9 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> {
deployment.id,
);

let filter = doc! { StreamConnection::KEY_DEPLOYMENT_ID: &deployment.id };
let mut ids: Vec<String> = vec![];
{
let projection = doc! { crate::KEY_ID: 1 };

let options = mongodb::options::FindOptions::builder()
.projection(projection)
.build();

let mut cursor = StreamConnection::cl_as::<IdDocument>()
.find(filter, options)
.await?;

while let Some(item) = cursor.try_next().await? {
ids.push(item.id);
}
}
// this unwrap is enforced with the mongodb filter
let closed_at = deployment.health_checked_at.unwrap();

// StreamConnectionLite
{
const KEY_CA: &str = const_str::concat!("$", StreamConnectionLite::KEY_CREATED_AT);
const KEY_DU: &str = const_str::concat!("$", StreamConnectionLite::KEY_DURATION_MS);
Expand All @@ -136,7 +125,7 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> {
doc! {
"$set": {
StreamConnectionLite::KEY_IS_OPEN: false,
StreamConnection::KEY_MANUALLY_CLOSED: true,
StreamConnection::KEY_ABNORNALLY_CLOSED: true,
StreamConnectionLite::KEY_CLOSED_AT: {
"$max": [
{
Expand Down Expand Up @@ -176,29 +165,24 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> {
},
];

let mut matched_count: u64 = 0;

for chunk in ids.chunks(100_000) {
let filter = doc! {
StreamConnectionLite::KEY_ID: { "$in": chunk },
StreamConnectionLite::KEY_IS_OPEN: true,
};

let r = StreamConnectionLite::cl()
.update_many(filter, update.clone(), None)
.await?;
let filter = doc! {
StreamConnectionLite::KEY_DEPLOYMENT_ID: &deployment.id,
StreamConnectionLite::KEY_IS_OPEN: true,
};

matched_count += r.matched_count;
}
let r = StreamConnectionLite::cl()
.update_many(filter, update, None)
.await?;

log::info!(
target: "deployment-health",
"closed {} stream_connections_lite for deployment {}",
matched_count,
r.matched_count,
deployment.id,
);
};

// StreamConnection
{
const KEY_CREATED_AT: &str = const_str::concat!("$", StreamConnection::KEY_CREATED_AT);
const KEY_DURATION_MS: &str = const_str::concat!("$", StreamConnection::KEY_DURATION_MS);
Expand All @@ -208,7 +192,7 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> {
doc! {
"$set": {
StreamConnection::KEY_IS_OPEN: false,
StreamConnection::KEY_MANUALLY_CLOSED: true,
StreamConnection::KEY_ABNORNALLY_CLOSED: true,
StreamConnection::KEY_CLOSED_AT: {
"$max": [
{
Expand Down Expand Up @@ -249,34 +233,81 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> {
},
];

let mut matched_count: u64 = 0;
let filter = doc! {
StreamConnection::KEY_DEPLOYMENT_ID: &deployment.id,
StreamConnection::KEY_IS_OPEN: true,
};

let r = StreamConnection::cl()
.update_many(filter, update, None)
.await?;

log::info!(
target: "deployment-health",
"closed {} stream_connections for deployment {}",
r.matched_count,
deployment.id,
);
};

for chunk in ids.chunks(100_000) {
let filter = doc! {
StreamConnection::KEY_ID: { "$in": &chunk },
StreamConnection::KEY_IS_OPEN: true,
};
// WsStatsConnection
{
const KEY_CA: &str = const_str::concat!("$", WsStatsConnection::KEY_CREATED_AT);

let r = StreamConnection::cl()
.update_many(filter, update.clone(), None)
.await?;
let update = vec![doc! {
"$set": {
WsStatsConnection::KEY_IS_OPEN: false,
WsStatsConnection::KEY_ABNORMALLY_CLOSED: true,
WsStatsConnection::KEY_CLOSED_AT: {
"$max": [
{
"$dateAdd": {
"startDate": KEY_CA,
"unit": "millisecond",
"amount": 1,
}
},
closed_at
]
},
WsStatsConnection::KEY_DURATION_MS: {
"$max": [
1,
{
"$dateDiff": {
"startDate": KEY_CA,
"endDate": closed_at,
"unit": "millisecond",
}
}
]
}
}
}];

matched_count += r.matched_count;
}
let filter = doc! {
WsStatsConnection::KEY_DEPLOYMENT_ID: &deployment.id,
WsStatsConnection::KEY_IS_OPEN: true,
};

let r = WsStatsConnection::cl()
.update_many(filter, update, None)
.await?;

log::info!(
target: "deployment-health",
"closed {} stream_connections for deployment {}",
matched_count,
"closed {} ws_stats_connections for deployment {}",
r.matched_count,
deployment.id,
);
}
};

// Deployment
{
let update = doc! {
"$set": {
Deployment::KEY_STATE: DeploymentState::KEY_ENUM_VARIANT_CLOSED,
Deployment::KEY_MANUALLY_CLOSED: true,
Deployment::KEY_ABNORMALLY_CLOSED: true,
Deployment::KEY_DROPPED_AT: closed_at,
}
};
Expand All @@ -302,7 +333,7 @@ pub async fn check_now() -> Result<(), mongodb::error::Error> {
target: "deployment-health",
"used listeners quota recalculated for all accounts",
);
}
};
}

Ok(())
Expand Down
16 changes: 0 additions & 16 deletions rs/packages/db/src/models/stream_connection/analytics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,9 @@ use crate::{station::Station, stream_connection::lite::StreamConnectionLite, Mod
#[derive(Debug, Serialize, Deserialize)]
#[macros::keys]
pub struct Item {
// #[serde(rename = "_id")]
// pub id: String,
#[serde(rename = "st")]
pub station_id: String,

//#[serde(rename = "op")]
//pub is_open: bool,
#[serde(rename = "ip")]
#[serde(with = "serde_util::ip")]
pub ip: IpAddr,
Expand All @@ -52,18 +48,6 @@ pub struct Item {

#[serde(rename = "ca")]
pub created_at: DateTime,
// #[serde(rename = "re")]
// #[serde(default)]
// #[serde(skip_serializing_if = "is_false")]
// pub is_external_relay_redirect: bool,

// #[serde(rename = "_m")]
// #[serde(default)]
// #[serde(skip_serializing_if = "is_false")]
// pub manually_closed: bool,

// #[serde(rename = "cl")]
// pub closed_at: Option<DateTime>,
}

impl Item {
Expand Down
12 changes: 5 additions & 7 deletions rs/packages/db/src/models/stream_connection/lite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,12 @@ pub struct StreamConnectionLite {
pub created_at: DateTime,

#[serde(rename = "re")]
#[serde(default)]
#[serde(skip_serializing_if = "is_false")]
#[serde(default, skip_serializing_if = "is_false")]
pub is_external_relay_redirect: bool,

#[serde(rename = "_m")]
#[serde(default)]
#[serde(skip_serializing_if = "is_false")]
pub manually_closed: bool,
#[serde(default, skip_serializing_if = "is_false")]
pub abnormally_closed: bool,

#[serde(rename = "cl")]
pub closed_at: Option<DateTime>,
Expand Down Expand Up @@ -100,7 +98,7 @@ impl StreamConnectionLite {
duration_ms: full.duration_ms,
transfer_bytes: full.transfer_bytes,
is_external_relay_redirect: full.is_external_relay_redirect,
manually_closed: full.manually_closed,
abnormally_closed: full.abnornally_closed,
created_at: full.created_at,
closed_at: full.closed_at,
}
Expand All @@ -122,7 +120,7 @@ impl From<StreamConnection> for StreamConnectionLite {
transfer_bytes: full.transfer_bytes,
country_code: full.country_code,
is_external_relay_redirect: full.is_external_relay_redirect,
manually_closed: full.manually_closed,
abnormally_closed: full.abnornally_closed,
created_at: full.created_at,
closed_at: full.closed_at,
}
Expand Down
11 changes: 5 additions & 6 deletions rs/packages/db/src/models/stream_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ pub mod lite;
pub mod stats;

#[allow(clippy::bool_comparison)]
fn is_false(b: &bool) -> bool {
*b == false
fn is_false(v: &bool) -> bool {
*v == false
}

crate::register!(StreamConnection);
Expand Down Expand Up @@ -46,10 +46,9 @@ pub struct StreamConnection {
#[serde(default)]
pub is_external_relay_redirect: bool,

#[serde(default)]
#[serde(skip_serializing_if = "is_false")]
#[serde(rename = "_manually_closed")]
pub manually_closed: bool,
#[serde(rename = "_m")]
#[serde(default, skip_serializing_if = "is_false")]
pub abnornally_closed: bool,

pub request: Request,
pub last_transfer_at: DateTime,
Expand Down
Loading

0 comments on commit fe3dce0

Please sign in to comment.