Skip to content

Commit

Permalink
fix: fix ws stats missing user id no accumulation error (#289)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramiroaisen authored Mar 10, 2024
2 parents 49caf39 + e93ef63 commit 2f1fc7f
Showing 1 changed file with 22 additions and 97 deletions.
119 changes: 22 additions & 97 deletions rs/packages/db/src/models/stream_connection/app_analytics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,24 @@ use futures_util::{StreamExt, TryStreamExt};
use geoip::CountryCode;
use mongodb::bson::doc;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize};
use serde_util::{timezone_datetime::TimezoneDateTime, DateTime};
use time::{OffsetDateTime, UtcOffset};
use ts_rs::TS;

use crate::{station::Station, ws_stats_connection::WsStatsConnection, Model};

fn deserialize_hash<'de, D: Deserializer<'de>>(de: D) -> Result<Option<u64>, D::Error> {
let s = Option::<String>::deserialize(de)?;
let v = s.map(|s| {
use std::hash::Hasher;
let mut hasher = std::collections::hash_map::DefaultHasher::new();
s.hash(&mut hasher);
hasher.finish()
});
Ok(v)
}

#[derive(Debug, Serialize, Deserialize)]
#[macros::keys]
pub struct Item {
Expand Down Expand Up @@ -50,7 +61,8 @@ pub struct Item {
pub app_version: Option<u32>,

#[serde(rename = "us")]
pub user_id: Option<String>,
#[serde(default, deserialize_with = "deserialize_hash")]
pub user_id: Option<u64>,

// #[serde(rename = "re")]
// #[serde(with = "serde_util::as_f64")]
Expand Down Expand Up @@ -140,9 +152,6 @@ pub struct Analytics {
pub by_hour: Option<Vec<AnalyticsItem<YearMonthDayHour>>>,
pub by_country: Vec<AnalyticsItem<Option<CountryCode>>>,
pub by_station: Vec<AnalyticsItem<String>>,
// pub by_browser: Vec<AnalyticsItem<Option<String>>>,
// pub by_domain: Vec<AnalyticsItem<Option<String>>>,
// pub by_os: Vec<AnalyticsItem<Option<String>>>,
pub by_app_kind: Vec<AnalyticsItem<Option<String>>>,
pub by_app_version: Vec<AnalyticsItem<AppKindVersion>>,
}
Expand Down Expand Up @@ -182,13 +191,6 @@ pub struct AnalyticsItem<K> {
pub max_concurrent_listeners_date: Option<serde_util::DateTime>,
}

// #[derive(Debug, Clone, Copy, Serialize, Eq, PartialEq, Hash, Ord, PartialOrd, Deserialize, TS)]
// #[ts(export, export_to = "../../../defs/app-analytics/")]
// pub struct YearMonth {
// pub year: u16,
// pub month: u8,
// }

#[derive(
Debug, Clone, Copy, Serialize, Eq, PartialEq, Hash, Ord, PartialOrd, Deserialize, TS, JsonSchema,
)]
Expand Down Expand Up @@ -234,9 +236,6 @@ pub struct AnalyticsQuery {
pub kind: AnalyticsQueryKind,
pub station_ids: Vec<String>,
pub country_code: Option<Option<CountryCode>>,
// pub browser: Option<Option<String>>,
// pub os: Option<Option<String>>,
// pub domain: Option<Option<String>>,
pub app_kind: Option<Option<String>>,
pub app_version: Option<Option<u32>>,
pub min_duration_ms: Option<u64>,
Expand All @@ -261,7 +260,7 @@ type KeyedAccumulatorMap<K> = HashMap<K, AccumulatorItem>;
struct AccumulatorItem {
sessions: u64,
ips: HashSet<IpAddr>,
users: HashSet<String>,
users: HashSet<u64>,
total_duration_ms: u64,
total_transfer_bytes: u64,
#[cfg(feature = "analytics-max-concurrent")]
Expand Down Expand Up @@ -336,7 +335,7 @@ struct Batch {

pub sessions: u64,
pub ips: HashSet<IpAddr>,
pub users: HashSet<String>,
pub users: HashSet<u64>,

pub total_duration_ms: u64,
pub total_transfer_bytes: u64,
Expand Down Expand Up @@ -370,9 +369,6 @@ impl Batch {
by_hour: Default::default(),
by_country: Default::default(),
by_station: Default::default(),
// by_browser: Default::default(),
// by_os: Default::default(),
// by_domain: Default::default(),
by_app_kind: Default::default(),
by_app_version: Default::default(),
#[cfg(feature = "analytics-max-concurrent")]
Expand Down Expand Up @@ -425,7 +421,12 @@ impl Batch {
($acc:expr, $key:expr) => {
let item = $acc.entry($key).or_default();
item.sessions += 1;

item.ips.insert(conn.ip);
if let Some(id) = conn.user_id {
item.users.insert(id);
}

item.total_duration_ms += conn_duration_ms;
// item.total_transfer_bytes += conn_transfer_bytes;

Expand Down Expand Up @@ -460,9 +461,6 @@ impl Batch {

add!(self.by_country, conn.country_code);
add!(self.by_station, conn.station_id);
// add!(self.by_browser, conn_browser);
// add!(self.by_os, conn_os);
// add!(self.by_domain, conn.domain);
add!(self.by_app_kind, conn.app_kind);
add!(self.by_app_version, conn_kind_version);
}
Expand All @@ -473,13 +471,11 @@ impl Batch {
self.total_duration_ms += dst.total_duration_ms;
self.total_transfer_bytes += dst.total_transfer_bytes;
self.ips.extend(dst.ips);
self.users.extend(dst.users);
merge_accumulator_maps(&mut self.by_day, dst.by_day);
merge_accumulator_maps(&mut self.by_hour, dst.by_hour);
merge_accumulator_maps(&mut self.by_country, dst.by_country);
merge_accumulator_maps(&mut self.by_station, dst.by_station);
// merge_accumulator_maps(&mut self.by_browser, dst.by_browser);
// merge_accumulator_maps(&mut self.by_os, dst.by_os);
// merge_accumulator_maps(&mut self.by_domain, dst.by_domain);
merge_accumulator_maps(&mut self.by_app_kind, dst.by_app_kind);
merge_accumulator_maps(&mut self.by_app_version, dst.by_app_version);

Expand Down Expand Up @@ -580,16 +576,6 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result<Analytics, mongodb::
with_hours =
(end_date.unix_timestamp() - start_date.unix_timestamp()) < (60 * 60 * 24 * 32);

// let ser_start_date: serde_util::DateTime = start_date.into();
// let ser_end_date: serde_util::DateTime = end_date.into();

// and.push(doc! {
// WsStatsConnection::KEY_CREATED_AT: {
// "$gte": ser_start_date,
// "$lt": ser_end_date,
// }
// });

start_end_date = Some((start_date, end_date));

offset_date = start_date;
Expand All @@ -603,18 +589,6 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result<Analytics, mongodb::
});
}

// if let Some(os) = query.os {
// and.push(doc! { WsStatsConnection::KEY_OS: os });
// }

// if let Some(browser) = query.browser {
// and.push(doc! { WsStatsConnection::KEY_BROWSER: browser });
// }

// if let Some(domain) = query.domain {
// and.push(doc! { WsStatsConnection::KEY_DOMAIN: domain });
// }

if let Some(app_kind) = query.app_kind {
and.push(doc! { WsStatsConnection::KEY_APP_KIND: app_kind });
}
Expand All @@ -632,20 +606,6 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result<Analytics, mongodb::
});
}

// let (count, count_ms) = {
// let start = Instant::now();
// let filter = doc! { "$and": &and };
// let count = WsStatsConnection::cl()
// .count_documents(filter, None)
// .await?;

// let ms = start.elapsed().as_millis();
// (count, ms)
// };

// let filter = doc!{ "$and": and };
// let mut cursor = WsStatsConnection::cl().find(filter, options).await?;

let mut first_last_and = and.clone();
if let Some((start_date, end_date)) = start_end_date {
let start_date: DateTime = start_date.into();
Expand Down Expand Up @@ -681,7 +641,6 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result<Analytics, mongodb::
ips: 0,
users: 0,
total_duration_ms: 0,
// total_transfer_bytes: 0,
#[cfg(feature = "analytics-max-concurrent")]
max_concurrent_listeners: 0,
#[cfg(feature = "analytics-max-concurrent")]
Expand All @@ -690,9 +649,6 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result<Analytics, mongodb::
by_hour: None,
by_country: vec![],
by_station: vec![],
// by_browser: vec![],
// by_os: vec![],
// by_domain: vec![],
by_app_kind: vec![],
by_app_version: vec![],
});
Expand All @@ -716,9 +672,6 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result<Analytics, mongodb::
tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
if !is_now {
// let step = time::Duration::milliseconds(step as i64);
// let start: DateTime = (*first_ts + step * i as u16).into();
// let end: DateTime = (*start + step).into();
let start_ms = first_ms + step * i as u64;
let end_ms = first_ms + step * (i + 1) as u64;

Expand All @@ -739,19 +692,6 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result<Analytics, mongodb::

let sort = doc! { WsStatsConnection::KEY_CREATED_AT: 1 };

// let options = mongodb::options::AggregateOptions::builder().build();

// let pipeline = vec![
// doc! { "$match": { "$and": and } },
// //doc! { "$sample": { "size": 100_000 } },
// doc! { "$sort": sort },
// ];

// let mut cursor = WsStatsConnection::cl()
// .find(pipeline, options)
// .await?
// .with_type::<StreamConnectionLite>();

let options = mongodb::options::FindOptions::builder()
.sort(sort)
.projection(Item::projection())
Expand Down Expand Up @@ -799,8 +739,6 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result<Analytics, mongodb::
use rayon::prelude::*;

let mut vec = $vec;
//vec.sort_unstable_by(|a, b| a.0.cmp(&b.0));
//vec.par_sort_unstable_by(|a, b| a.0.cmp(&b.0).then(b.1.cmp(&a.1)));
vec.par_sort_unstable();

let mut max: u32 = 0;
Expand Down Expand Up @@ -856,15 +794,10 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result<Analytics, mongodb::
};
}

// collect
// let mut by_month = collect!(months_accumulator);
let mut by_day = collect!(batch.by_day);
let mut by_hour = collect!(batch.by_hour);
let mut by_country = collect!(batch.by_country);
let mut by_station = collect!(batch.by_station);
// let mut by_browser = collect!(batch.by_browser);
// let mut by_os = collect!(batch.by_os);
// let mut by_domain = collect!(batch.by_domain);
let mut by_app_kind = collect!(batch.by_app_kind);
let mut by_app_version = collect!(batch.by_app_version);

Expand All @@ -881,15 +814,11 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result<Analytics, mongodb::
};
}

// sort_by_key!(by_month);
sort_by_key!(by_day);
sort_by_key!(by_hour);

sort_by_sessions!(by_country);
sort_by_sessions!(by_station);
// sort_by_sessions!(by_browser);
// sort_by_sessions!(by_os);
// sort_by_sessions!(by_domain);
sort_by_sessions!(by_app_kind);
sort_by_sessions!(by_app_version);

Expand Down Expand Up @@ -922,7 +851,6 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result<Analytics, mongodb::
utc_offset_minutes: offset_date.offset().whole_minutes(),
sessions: batch.sessions,
total_duration_ms: batch.total_duration_ms,
// total_transfer_bytes: batch.total_transfer_bytes,
ips: batch.ips.len() as u64,
users: batch.users.len() as u64,
#[cfg(feature = "analytics-max-concurrent")]
Expand All @@ -934,9 +862,6 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result<Analytics, mongodb::
by_hour,
by_country,
by_station,
// by_browser,
// by_os,
// by_domain,
by_app_kind,
by_app_version,
};
Expand Down

0 comments on commit 2f1fc7f

Please sign in to comment.