From e93ef63f0184593b743b41f686c081a38da6104b Mon Sep 17 00:00:00 2001 From: ramiroaisen <52116153+ramiroaisen@users.noreply.github.com> Date: Sun, 10 Mar 2024 11:40:32 -0300 Subject: [PATCH] fix: fix status missing user id accumulate --- .../stream_connection/app_analytics/mod.rs | 119 ++++-------------- 1 file changed, 22 insertions(+), 97 deletions(-) diff --git a/rs/packages/db/src/models/stream_connection/app_analytics/mod.rs b/rs/packages/db/src/models/stream_connection/app_analytics/mod.rs index 9bc4c180..db318bb1 100644 --- a/rs/packages/db/src/models/stream_connection/app_analytics/mod.rs +++ b/rs/packages/db/src/models/stream_connection/app_analytics/mod.rs @@ -9,13 +9,24 @@ use futures_util::{StreamExt, TryStreamExt}; use geoip::CountryCode; use mongodb::bson::doc; use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; use serde_util::{timezone_datetime::TimezoneDateTime, DateTime}; use time::{OffsetDateTime, UtcOffset}; use ts_rs::TS; use crate::{station::Station, ws_stats_connection::WsStatsConnection, Model}; +fn deserialize_hash<'de, D: Deserializer<'de>>(de: D) -> Result, D::Error> { + let s = Option::::deserialize(de)?; + let v = s.map(|s| { + use std::hash::Hasher; + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + s.hash(&mut hasher); + hasher.finish() + }); + Ok(v) +} + #[derive(Debug, Serialize, Deserialize)] #[macros::keys] pub struct Item { @@ -50,7 +61,8 @@ pub struct Item { pub app_version: Option, #[serde(rename = "us")] - pub user_id: Option, + #[serde(default, deserialize_with = "deserialize_hash")] + pub user_id: Option, // #[serde(rename = "re")] // #[serde(with = "serde_util::as_f64")] @@ -140,9 +152,6 @@ pub struct Analytics { pub by_hour: Option>>, pub by_country: Vec>>, pub by_station: Vec>, - // pub by_browser: Vec>>, - // pub by_domain: Vec>>, - // pub by_os: Vec>>, pub by_app_kind: Vec>>, pub by_app_version: Vec>, } @@ -182,13 +191,6 @@ pub struct AnalyticsItem { pub max_concurrent_listeners_date: Option, } -// #[derive(Debug, Clone, Copy, Serialize, Eq, PartialEq, Hash, Ord, PartialOrd, Deserialize, TS)] -// #[ts(export, export_to = "../../../defs/app-analytics/")] -// pub struct YearMonth { -// pub year: u16, -// pub month: u8, -// } - #[derive( Debug, Clone, Copy, Serialize, Eq, PartialEq, Hash, Ord, PartialOrd, Deserialize, TS, JsonSchema, )] @@ -234,9 +236,6 @@ pub struct AnalyticsQuery { pub kind: AnalyticsQueryKind, pub station_ids: Vec, pub country_code: Option>, - // pub browser: Option>, - // pub os: Option>, - // pub domain: Option>, pub app_kind: Option>, pub app_version: Option>, pub min_duration_ms: Option, @@ -261,7 +260,7 @@ type KeyedAccumulatorMap = HashMap; struct AccumulatorItem { sessions: u64, ips: HashSet, - users: HashSet, + users: HashSet, total_duration_ms: u64, total_transfer_bytes: u64, #[cfg(feature = "analytics-max-concurrent")] @@ -336,7 +335,7 @@ struct Batch { pub sessions: u64, pub ips: HashSet, - pub users: HashSet, + pub users: HashSet, pub total_duration_ms: u64, pub total_transfer_bytes: u64, @@ -370,9 +369,6 @@ impl Batch { by_hour: Default::default(), by_country: Default::default(), by_station: Default::default(), - // by_browser: Default::default(), - // by_os: Default::default(), - // by_domain: Default::default(), by_app_kind: Default::default(), by_app_version: Default::default(), #[cfg(feature = "analytics-max-concurrent")] @@ -425,7 +421,12 @@ impl Batch { ($acc:expr, $key:expr) => { let item = $acc.entry($key).or_default(); item.sessions += 1; + item.ips.insert(conn.ip); + if let Some(id) = conn.user_id { + item.users.insert(id); + } + item.total_duration_ms += conn_duration_ms; // item.total_transfer_bytes += conn_transfer_bytes; @@ -460,9 +461,6 @@ impl Batch { add!(self.by_country, conn.country_code); add!(self.by_station, conn.station_id); - // add!(self.by_browser, conn_browser); - // add!(self.by_os, conn_os); - // add!(self.by_domain, conn.domain); add!(self.by_app_kind, conn.app_kind); add!(self.by_app_version, conn_kind_version); } @@ -473,13 +471,11 @@ impl Batch { self.total_duration_ms += dst.total_duration_ms; self.total_transfer_bytes += dst.total_transfer_bytes; self.ips.extend(dst.ips); + self.users.extend(dst.users); merge_accumulator_maps(&mut self.by_day, dst.by_day); merge_accumulator_maps(&mut self.by_hour, dst.by_hour); merge_accumulator_maps(&mut self.by_country, dst.by_country); merge_accumulator_maps(&mut self.by_station, dst.by_station); - // merge_accumulator_maps(&mut self.by_browser, dst.by_browser); - // merge_accumulator_maps(&mut self.by_os, dst.by_os); - // merge_accumulator_maps(&mut self.by_domain, dst.by_domain); merge_accumulator_maps(&mut self.by_app_kind, dst.by_app_kind); merge_accumulator_maps(&mut self.by_app_version, dst.by_app_version); @@ -580,16 +576,6 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result Result Result Result Result Result Result(); - let options = mongodb::options::FindOptions::builder() .sort(sort) .projection(Item::projection()) @@ -799,8 +739,6 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result Result Result Result Result