Skip to content

Commit

Permalink
feat: project stream analytics items and ws stats analytics items (#270)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramiroaisen authored Feb 6, 2024
2 parents fa42760 + 53d28dc commit 26f4fcf
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 17 deletions.
3 changes: 3 additions & 0 deletions defs/api/stream-connections/GET/Output.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@
"default": false,
"type": "boolean"
},
"_manually_closed": {
"type": "boolean"
},
"request": {
"type": "object",
"required": [
Expand Down
1 change: 1 addition & 0 deletions defs/db/StreamConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export type StreamConnection = {
country_code: CountryCode | null;
ip: string;
is_external_relay_redirect: boolean;
_manually_closed: boolean;
request: Request;
last_transfer_at: DateTime;
closed_at: DateTime | null;
Expand Down
1 change: 1 addition & 0 deletions defs/db/StreamConnectionLite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ export type StreamConnectionLite = {
os: string | null;
ca: DateTime;
re: boolean;
_m: boolean;
cl: DateTime | null;
};
105 changes: 99 additions & 6 deletions rs/packages/db/src/models/stream_connection/analytics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,93 @@ use ts_rs::TS;

use crate::{station::Station, stream_connection::lite::StreamConnectionLite, Model};

#[derive(Debug, Serialize, Deserialize)]
#[macros::keys]
pub struct Item {
// #[serde(rename = "_id")]
// pub id: String,
#[serde(rename = "st")]
pub station_id: String,

//#[serde(rename = "op")]
//pub is_open: bool,
#[serde(rename = "ip")]
#[serde(with = "serde_util::ip")]
pub ip: IpAddr,

#[serde(rename = "cc")]
pub country_code: Option<CountryCode>,

#[serde(rename = "du")]
#[serde(with = "serde_util::as_f64::option")]
pub duration_ms: Option<u64>,

#[serde(rename = "by")]
#[serde(with = "serde_util::as_f64::option")]
pub transfer_bytes: Option<u64>,

#[serde(rename = "br")]
pub browser: Option<String>,

#[serde(rename = "do")]
pub domain: Option<String>,

#[serde(rename = "os")]
pub os: Option<String>,

#[serde(rename = "ca")]
pub created_at: DateTime,
// #[serde(rename = "re")]
// #[serde(default)]
// #[serde(skip_serializing_if = "is_false")]
// pub is_external_relay_redirect: bool,

// #[serde(rename = "_m")]
// #[serde(default)]
// #[serde(skip_serializing_if = "is_false")]
// pub manually_closed: bool,

// #[serde(rename = "cl")]
// pub closed_at: Option<DateTime>,
}

impl Item {
fn projection() -> mongodb::bson::Document {
doc! {
crate::KEY_ID: 0,
Item::KEY_STATION_ID: 1,
Item::KEY_IP: 1,
Item::KEY_COUNTRY_CODE: 1,
Item::KEY_DURATION_MS: 1,
Item::KEY_TRANSFER_BYTES: 1,
Item::KEY_BROWSER: 1,
Item::KEY_DOMAIN: 1,
Item::KEY_OS: 1,
Item::KEY_CREATED_AT: 1,
}
}
}

#[cfg(test)]
#[test]
fn stream_connection_analytics_item_keys() {
assert_eq!(StreamConnectionLite::KEY_STATION_ID, Item::KEY_STATION_ID);
assert_eq!(StreamConnectionLite::KEY_IP, Item::KEY_IP);
assert_eq!(
StreamConnectionLite::KEY_COUNTRY_CODE,
Item::KEY_COUNTRY_CODE
);
assert_eq!(StreamConnectionLite::KEY_DURATION_MS, Item::KEY_DURATION_MS);
assert_eq!(
StreamConnectionLite::KEY_TRANSFER_BYTES,
Item::KEY_TRANSFER_BYTES
);
assert_eq!(StreamConnectionLite::KEY_BROWSER, Item::KEY_BROWSER);
assert_eq!(StreamConnectionLite::KEY_DOMAIN, Item::KEY_DOMAIN);
assert_eq!(StreamConnectionLite::KEY_OS, Item::KEY_OS);
assert_eq!(StreamConnectionLite::KEY_CREATED_AT, Item::KEY_CREATED_AT);
}

#[derive(Debug, Clone, Serialize, Deserialize, TS, JsonSchema)]
#[ts(export, export_to = "../../../defs/analytics/")]
pub struct Analytics {
Expand Down Expand Up @@ -273,7 +360,7 @@ impl Batch {
}

#[inline(always)]
pub fn add(&mut self, conn: StreamConnectionLite) {
pub fn add(&mut self, conn: Item) {
let created_at = conn.created_at.to_offset(self.offset);

let conn_duration_ms = conn
Expand Down Expand Up @@ -303,8 +390,9 @@ impl Batch {
#[cfg(feature = "analytics-max-concurrent")]
let stop = StartStopEvent::new(start_s + (conn_duration_ms / 1000) as u32, false);

if !conn.is_open {
#[cfg(feature = "analytics-max-concurrent")]
// duration_ms is None if the connection is still open
#[cfg(feature = "analytics-max-concurrent")]
if conn.duration_ms.is_some() {
self.start_stop_events.push(stop);
}

Expand All @@ -320,7 +408,7 @@ impl Batch {
item.start_stop_events.push(start);

#[cfg(feature = "analytics-max-concurrent")]
if !conn.is_open {
if conn.duration_ms.is_some() {
item.start_stop_events.push(stop);
}
};
Expand Down Expand Up @@ -625,10 +713,15 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result<Analytics, mongodb::
// .await?
// .with_type::<StreamConnectionLite>();

let options = mongodb::options::FindOptions::builder().sort(sort).build();
let options = mongodb::options::FindOptions::builder()
.sort(sort)
.projection(Item::projection())
.build();

let filter = doc! { "$and": and };
let mut cursor = StreamConnectionLite::cl().find(filter, options).await?;
let mut cursor = StreamConnectionLite::cl_as::<Item>()
.find(filter, options)
.await?;

let mut batch = Batch::new(offset_date.offset());

Expand Down
87 changes: 82 additions & 5 deletions rs/packages/db/src/models/stream_connection/app_analytics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,78 @@ use ts_rs::TS;

use crate::{station::Station, ws_stats_connection::WsStatsConnection, Model};

#[derive(Debug, Serialize, Deserialize)]
#[macros::keys]
pub struct Item {
// #[serde(rename = "_id")]
// pub id: String,
#[serde(rename = "st")]
pub station_id: String,

// #[serde(rename = "dp")]
// pub deployment_id: String,

// #[serde(with = "serde_util::as_f64::option")]
// pub transfer_bytes: Option<u64>,
#[serde(rename = "du")]
#[serde(with = "serde_util::as_f64::option")]
pub duration_ms: Option<u64>,

// #[serde(rename = "op")]
// pub is_open: bool,
#[serde(rename = "cc")]
pub country_code: Option<CountryCode>,

#[serde(rename = "ip")]
#[serde(with = "serde_util::ip")]
pub ip: IpAddr,

#[serde(rename = "ap")]
pub app_kind: Option<String>,

#[serde(rename = "av")]
#[serde(with = "serde_util::as_f64::option")]
pub app_version: Option<u32>,

// #[serde(rename = "re")]
// #[serde(with = "serde_util::as_f64")]
// pub reconnections: u16,
#[serde(rename = "ca")]
pub created_at: DateTime,
// pub request: Request,
// pub last_transfer_at: DateTime,

// #[serde(rename = "cl")]
// pub closed_at: Option<DateTime>,
}

impl Item {
pub fn projection() -> mongodb::bson::Document {
doc! {
crate::KEY_ID: 0,
WsStatsConnection::KEY_STATION_ID: 1,
WsStatsConnection::KEY_DURATION_MS: 1,
WsStatsConnection::KEY_COUNTRY_CODE: 1,
WsStatsConnection::KEY_IP: 1,
WsStatsConnection::KEY_APP_KIND: 1,
WsStatsConnection::KEY_APP_VERSION: 1,
WsStatsConnection::KEY_CREATED_AT: 1,
}
}
}

#[cfg(test)]
#[test]
fn ws_stat_item_keys() {
assert_eq!(Item::KEY_STATION_ID, WsStatsConnection::KEY_STATION_ID);
assert_eq!(Item::KEY_DURATION_MS, WsStatsConnection::KEY_DURATION_MS);
assert_eq!(Item::KEY_COUNTRY_CODE, WsStatsConnection::KEY_COUNTRY_CODE);
assert_eq!(Item::KEY_IP, WsStatsConnection::KEY_IP);
assert_eq!(Item::KEY_APP_KIND, WsStatsConnection::KEY_APP_KIND);
assert_eq!(Item::KEY_APP_VERSION, WsStatsConnection::KEY_APP_VERSION);
assert_eq!(Item::KEY_CREATED_AT, WsStatsConnection::KEY_CREATED_AT);
}

#[derive(Debug, Clone, Serialize, Deserialize, TS, JsonSchema)]
#[ts(export, export_to = "../../../defs/app-analytics/")]
pub struct Analytics {
Expand Down Expand Up @@ -288,7 +360,7 @@ impl Batch {
}

#[inline(always)]
pub fn add(&mut self, conn: WsStatsConnection) {
pub fn add(&mut self, conn: Item) {
let created_at = conn.created_at.to_offset(self.offset);

let conn_duration_ms = conn
Expand Down Expand Up @@ -322,7 +394,7 @@ impl Batch {
#[cfg(feature = "analytics-max-concurrent")]
let stop = StartStopEvent::new(start_s + (conn_duration_ms / 1000) as u32, false);

if !conn.is_open {
if conn.duration_ms.is_some() {
#[cfg(feature = "analytics-max-concurrent")]
self.start_stop_events.push(stop);
}
Expand All @@ -339,7 +411,7 @@ impl Batch {
item.start_stop_events.push(start);

#[cfg(feature = "analytics-max-concurrent")]
if !conn.is_open {
if conn.duration_ms.is_some() {
item.start_stop_events.push(stop);
}
};
Expand Down Expand Up @@ -657,10 +729,15 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result<Analytics, mongodb::
// .await?
// .with_type::<StreamConnectionLite>();

let options = mongodb::options::FindOptions::builder().sort(sort).build();
let options = mongodb::options::FindOptions::builder()
.sort(sort)
.projection(Item::projection())
.build();

let filter = doc! { "$and": and };
let mut cursor = WsStatsConnection::cl().find(filter, options).await?;
let mut cursor = WsStatsConnection::cl_as::<Item>()
.find(filter, options)
.await?;

let mut batch = Batch::new(offset_date.offset());

Expand Down
9 changes: 7 additions & 2 deletions rs/packages/db/src/models/stream_connection/lite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,16 @@ pub struct StreamConnectionLite {
#[serde(skip_serializing_if = "is_false")]
pub is_external_relay_redirect: bool,

#[serde(rename = "_m")]
#[serde(default)]
#[serde(skip_serializing_if = "is_false")]
pub manually_closed: bool,

#[serde(rename = "cl")]
pub closed_at: Option<DateTime>,
}

impl StreamConnectionLite {
pub const KET_MANUALLY_CLOSED: &'static str = "_m";

pub fn get_domain(full: &StreamConnection) -> Option<String> {
match full.request.headers.get("referer") {
None => None,
Expand All @@ -92,6 +95,7 @@ impl StreamConnectionLite {
duration_ms: full.duration_ms,
transfer_bytes: full.transfer_bytes,
is_external_relay_redirect: full.is_external_relay_redirect,
manually_closed: full.manually_closed,
created_at: full.created_at,
closed_at: full.closed_at,
}
Expand All @@ -112,6 +116,7 @@ impl From<StreamConnection> for StreamConnectionLite {
transfer_bytes: full.transfer_bytes,
country_code: full.country_code,
is_external_relay_redirect: full.is_external_relay_redirect,
manually_closed: full.manually_closed,
created_at: full.created_at,
closed_at: full.closed_at,
}
Expand Down
14 changes: 10 additions & 4 deletions rs/packages/db/src/models/stream_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ pub mod index;
pub mod lite;
pub mod stats;

#[allow(clippy::bool_comparison)]
fn is_false(b: &bool) -> bool {
*b == false
}

crate::register!(StreamConnection);

#[derive(Debug, Clone, Serialize, Deserialize, TS, JsonSchema)]
Expand Down Expand Up @@ -41,15 +46,16 @@ pub struct StreamConnection {
#[serde(default)]
pub is_external_relay_redirect: bool,

#[serde(default)]
#[serde(skip_serializing_if = "is_false")]
#[serde(rename = "_manually_closed")]
pub manually_closed: bool,

pub request: Request,
pub last_transfer_at: DateTime,
pub closed_at: Option<DateTime>,
}

impl StreamConnection {
pub const KEY_MANUALLY_CLOSED: &'static str = "_manually_closed";
}

impl Model for StreamConnection {
const CL_NAME: &'static str = "stream_connections";
const UID_LEN: usize = 12;
Expand Down
2 changes: 2 additions & 0 deletions rs/packages/stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ impl StreamHandler {
duration_ms: Some(0),
request,
is_external_relay_redirect: true,
manually_closed: false,
created_at: now,
last_transfer_at: now,
closed_at: Some(now),
Expand Down Expand Up @@ -446,6 +447,7 @@ impl StreamHandler {
request,
created_at: now,
is_external_relay_redirect: false,
manually_closed: false,
last_transfer_at: now,
closed_at: None,
}
Expand Down

0 comments on commit 26f4fcf

Please sign in to comment.