diff --git a/defs/api/app-analytics/GET/CountryCodeOrZZ.ts b/defs/api/app-analytics/GET/CountryCodeOrZZ.ts new file mode 100644 index 00000000..3e594c3f --- /dev/null +++ b/defs/api/app-analytics/GET/CountryCodeOrZZ.ts @@ -0,0 +1,5 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { CountryCode } from "../../../CountryCode"; +import type { ZZ } from "./ZZ"; + +export type CountryCodeOrZZ = ZZ | CountryCode; diff --git a/defs/api/app-analytics/GET/Output.ts b/defs/api/app-analytics/GET/Output.ts new file mode 100644 index 00000000..51d99836 --- /dev/null +++ b/defs/api/app-analytics/GET/Output.ts @@ -0,0 +1,4 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { Analytics } from "../../../app-analytics/Analytics"; + +export type Output = { analytics: Analytics }; diff --git a/defs/api/app-analytics/GET/Query.ts b/defs/api/app-analytics/GET/Query.ts new file mode 100644 index 00000000..45c3dded --- /dev/null +++ b/defs/api/app-analytics/GET/Query.ts @@ -0,0 +1,12 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { AnalyticsQueryKind } from "../../../app-analytics/AnalyticsQueryKind"; +import type { CountryCodeOrZZ } from "./CountryCodeOrZZ"; + +export type Query = { + kind: AnalyticsQueryKind; + stations: string[] | undefined; + app_kind: string | null; + app_version: string | null; + country_code: CountryCodeOrZZ | null; + min_duration_ms?: number; +}; diff --git a/defs/api/app-analytics/GET/ZZ.ts b/defs/api/app-analytics/GET/ZZ.ts new file mode 100644 index 00000000..24bfdb6e --- /dev/null +++ b/defs/api/app-analytics/GET/ZZ.ts @@ -0,0 +1,3 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ZZ = "ZZ"; diff --git a/defs/app-analytics/Analytics.ts b/defs/app-analytics/Analytics.ts new file mode 100644 index 00000000..803063c1 --- /dev/null +++ b/defs/app-analytics/Analytics.ts @@ -0,0 +1,29 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { AnalyticsItem } from "./AnalyticsItem"; +import type { AnalyticsQueryKind } from "./AnalyticsQueryKind"; +import type { AnalyticsStation } from "./AnalyticsStation"; +import type { AppKindVersion } from "./AppKindVersion"; +import type { CountryCode } from "../CountryCode"; +import type { DateTime } from "../DateTime"; +import type { YearMonthDay } from "./YearMonthDay"; +import type { YearMonthDayHour } from "./YearMonthDayHour"; + +export type Analytics = { + is_now: boolean; + kind: AnalyticsQueryKind; + stations: Array; + since: /** time::DateTime */ string; + until: /** time::DateTime */ string; + utc_offset_minutes: number; + sessions: number; + ips: number; + total_duration_ms: number; + max_concurrent_listeners?: number; + max_concurrent_listeners_date?: DateTime; + by_day: Array>; + by_hour: Array> | null; + by_country: Array>; + by_station: Array>; + by_app_kind: Array>; + by_app_version: Array>; +}; diff --git a/defs/app-analytics/AnalyticsItem.ts b/defs/app-analytics/AnalyticsItem.ts new file mode 100644 index 00000000..377b1be3 --- /dev/null +++ b/defs/app-analytics/AnalyticsItem.ts @@ -0,0 +1,12 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { DateTime } from "../DateTime"; + +export type AnalyticsItem = { + key: K; + sessions: number; + ips: number; + total_duration_ms: number; + total_transfer_bytes: number; + max_concurrent_listeners?: number; + max_concurrent_listeners_date?: DateTime; +}; diff --git a/defs/app-analytics/AnalyticsQueryKind.ts b/defs/app-analytics/AnalyticsQueryKind.ts new file mode 100644 index 00000000..b93aeb78 --- /dev/null +++ b/defs/app-analytics/AnalyticsQueryKind.ts @@ -0,0 +1,10 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type AnalyticsQueryKind = { + now: { offset_date: /** time::DateTime */ string }; +} | { + time_range: { + since: /** time::DateTime */ string; + until: /** time::DateTime */ string; + }; +}; diff --git a/defs/app-analytics/AnalyticsStation.ts b/defs/app-analytics/AnalyticsStation.ts new file mode 100644 index 00000000..5f563ce5 --- /dev/null +++ b/defs/app-analytics/AnalyticsStation.ts @@ -0,0 +1,8 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { DateTime } from "../DateTime"; + +export type AnalyticsStation = { + _id: string; + name: string; + created_at: DateTime; +}; diff --git a/defs/app-analytics/AppKindVersion.ts b/defs/app-analytics/AppKindVersion.ts new file mode 100644 index 00000000..da63ee07 --- /dev/null +++ b/defs/app-analytics/AppKindVersion.ts @@ -0,0 +1,3 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type AppKindVersion = { kind: string | null; version: number | null }; diff --git a/defs/app-analytics/YearMonthDay.ts b/defs/app-analytics/YearMonthDay.ts new file mode 100644 index 00000000..8d026a4e --- /dev/null +++ b/defs/app-analytics/YearMonthDay.ts @@ -0,0 +1,3 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type YearMonthDay = { year: number; month: number; day: number }; diff --git a/defs/app-analytics/YearMonthDayHour.ts b/defs/app-analytics/YearMonthDayHour.ts new file mode 100644 index 00000000..57989ab6 --- /dev/null +++ b/defs/app-analytics/YearMonthDayHour.ts @@ -0,0 +1,8 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type YearMonthDayHour = { + year: number; + month: number; + day: number; + hour: number; +}; diff --git a/front/admin/src/routes/(online)/(app)/analytics/+page.svelte b/front/admin/src/routes/(online)/(app)/analytics/+page.svelte index ab91edd9..4a3ac3d5 100644 --- a/front/admin/src/routes/(online)/(app)/analytics/+page.svelte +++ b/front/admin/src/routes/(online)/(app)/analytics/+page.svelte @@ -1,6 +1,7 @@ @@ -61,9 +74,7 @@ - - Analytics - + Analytics item.account_id === data.account._id); + let type: "stream" | "app" = "stream"; let country_code: CountryCode | null | undefined = undefined; let os: string | null | undefined = undefined; let browser: string | null | undefined = undefined; @@ -16,27 +19,35 @@ let kind: QueryKind = "last-30d"; let selected_stations: StationItem[] | "all" = "all"; let loading: boolean = false; - let analytics_data: import("$server/defs/analytics/Analytics").Analytics | null = null; - + let app_kind: string | null | undefined = undefined; + let app_version: number | null | undefined = undefined; + let analytics_data: Data | null = null; + type Snapshot = { + type: "stream" | "app", + kind: QueryKind, country_code: CountryCode | null | undefined, os: string | null | undefined, browser: string | null | undefined, domain: string | null | undefined, - kind: QueryKind, + app_kind: string | null | undefined, + app_version: number | null | undefined, selected_stations: StationItem[] | "all", - analytics_data: import("$server/defs/analytics/Analytics").Analytics | null, + analytics_data: Data | null, }; export const snapshot = { capture: (): Snapshot => { return { + type, analytics_data, country_code, os, browser, kind, domain, + app_kind, + app_version, selected_stations } }, @@ -49,6 +60,8 @@ country_code, os, domain, + app_kind, + app_version, selected_stations, } = snapshot); } @@ -60,10 +73,13 @@ -

{$locale.pages["account/analytics"].title}

+

+ {$locale.pages["account/analytics"].title} +

+ />
\ No newline at end of file diff --git a/front/server/src/api/shared-api.ts b/front/server/src/api/shared-api.ts index 9c4d871e..8ed5b08b 100644 --- a/front/server/src/api/shared-api.ts +++ b/front/server/src/api/shared-api.ts @@ -305,6 +305,10 @@ export const shared_api = ({ return await client.analytics.get(ip(req), ua(req), get_token(req), req.query as any); })) + api.route("/app-analytics").get(json(async req => { + return await client.app_analytics.get(ip(req), ua(req), get_token(req), req.query as any); + })) + api.route("/invitations") .get(json(async req => { return await client.invitations.list(ip(req), ua(req), get_token(req), req.query as any) diff --git a/front/server/src/client.ts b/front/server/src/client.ts index b4e08bb0..6abc53fb 100644 --- a/front/server/src/client.ts +++ b/front/server/src/client.ts @@ -30,6 +30,7 @@ export class Client { accounts: Accounts; stations: Stations; analytics: Analytics; + app_analytics: AppAnalytics; invitations: AccountInvitations; payment_methods: PaymentMethods; stream_connections: StreamConnections; @@ -48,6 +49,7 @@ export class Client { this.accounts = new Accounts(this); this.stations = new Stations(this); this.analytics = new Analytics(this); + this.app_analytics = new AppAnalytics(this); this.invitations = new AccountInvitations(this); this.payment_methods = new PaymentMethods(this); this.stream_connections = new StreamConnections(this); @@ -635,6 +637,18 @@ export class Analytics { } } +export class AppAnalytics { + client: Client; + constructor(client: Client) { + this.client = client; + } + + async get(ip: string | null, ua: string | null, token: string, query: import("$api/app-analytics/GET/Query").Query): Promise { + const url = `/app-analytics${qss(query)}`; + return await this.client.get(ip, ua, token, url); + } +} + export class PaymentMethods { client: Client; constructor(client: Client) { diff --git a/front/share/src/analytics/Analytics.svelte b/front/share/src/analytics/Analytics.svelte index 685f06bb..7c3679bc 100644 --- a/front/share/src/analytics/Analytics.svelte +++ b/front/share/src/analytics/Analytics.svelte @@ -1,7 +1,9 @@ + + + +
+
+ {#if type === "stream"} +
+ {/if} + +
+
+ {#if type === "app"} +
+ {/if} + +
+
\ No newline at end of file diff --git a/rs/packages/api/src/routes/app_analytics/mod.rs b/rs/packages/api/src/routes/app_analytics/mod.rs new file mode 100644 index 00000000..2cb6d2ca --- /dev/null +++ b/rs/packages/api/src/routes/app_analytics/mod.rs @@ -0,0 +1,245 @@ +use crate::json::JsonHandler; +use crate::request_ext::{self, GetAccessTokenScopeError}; + +use async_trait::async_trait; +use mongodb::bson::doc; +use prex::Request; +use serde::{Deserialize, Serialize}; +use ts_rs::TS; + +pub mod get { + + use crate::{error::ApiError, request_ext::AccessTokenScope}; + use db::stream_connection::app_analytics; + use db::Model; + use db::{station::Station, stream_connection::app_analytics::Analytics}; + use geoip::CountryCode; + use mongodb::bson::Bson; + + use super::*; + + #[derive(Debug, Clone)] + pub struct Endpoint {} + + #[derive(Debug, Clone)] + pub struct Input { + pub access_token_scope: AccessTokenScope, + pub query: Query, + } + + #[derive(Debug, Clone, Serialize, Deserialize, TS)] + #[ts(export)] + #[ts(export_to = "../../../defs/api/app-analytics/GET/")] + pub struct Output { + pub analytics: Analytics, + } + + #[derive(Debug, Clone, Serialize, Deserialize, TS)] + #[ts(export)] + #[ts(export_to = "../../../defs/api/app-analytics/GET/")] + #[serde(untagged)] + pub enum CountryCodeOrZZ { + ZZ(ZZ), + CC(CountryCode), + } + + #[derive(Debug, Clone, Serialize, Deserialize, TS)] + #[ts(export)] + #[ts(export_to = "../../../defs/api/app-analytics/GET/")] + pub enum ZZ { + ZZ, + } + + #[derive(Debug, Clone, Serialize, Deserialize, TS)] + #[ts(export)] + #[ts(export_to = "../../../defs/api/app-analytics/GET/")] + pub struct Query { + pub kind: app_analytics::AnalyticsQueryKind, + + #[serde(default)] + #[ts(type = "string[] | undefined")] + /// ommiting this value means all available stations + /// for the current access scope (this is valid only for admin and global access token scopes) + pub stations: Vec, + + // #[serde(default)] + // pub browser: Option, + + // #[serde(default)] + // pub os: Option, + + // #[serde(default)] + // pub domain: Option, + #[serde(default)] + pub app_kind: Option, + + // map to u32 + #[serde(default)] + pub app_version: Option, + + #[serde(default)] + pub country_code: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub min_duration_ms: Option, + } + + #[derive(Debug, thiserror::Error)] + pub enum ParseError { + #[error("token: {0}")] + Token(#[from] GetAccessTokenScopeError), + #[error("query: {0}")] + Query(#[from] serde_qs::Error), + } + + impl From for ApiError { + fn from(e: ParseError) -> Self { + match e { + ParseError::Token(e) => e.into(), + ParseError::Query(e) => e.into(), + } + } + } + + #[derive(Debug, thiserror::Error)] + pub enum HandleError { + #[error("token: {0}")] + Token(#[from] GetAccessTokenScopeError), + #[error("query: {0}")] + Db(#[from] mongodb::error::Error), + } + + impl From for ApiError { + fn from(e: HandleError) -> Self { + match e { + HandleError::Token(e) => e.into(), + HandleError::Db(e) => e.into(), + } + } + } + + #[async_trait] + impl JsonHandler for Endpoint { + type Input = Input; + type Output = Output; + type ParseError = ParseError; + type HandleError = HandleError; + + async fn parse(&self, req: Request) -> Result { + let access_token_scope = request_ext::get_access_token_scope(&req).await?; + let query: Query = req.qs()?; + + Ok(Input { + access_token_scope, + query, + }) + } + + async fn perform(&self, input: Input) -> Result { + let Input { + access_token_scope, + query, + } = input; + + let Query { + kind, + stations: station_ids, + country_code, + // browser, + // os, + // domain, + app_kind, + app_version, + min_duration_ms, + } = query; + + let station_ids = match access_token_scope { + AccessTokenScope::Global | AccessTokenScope::Admin(_) => { + if station_ids.is_empty() { + let values = Station::cl().distinct(Station::KEY_ID, None, None).await?; + values + .into_iter() + .filter_map(|v| { + match v { + Bson::String(s) => Some(s), + // this will never happen + _ => None, + } + }) + .collect::>() + } else { + station_ids + } + } + + AccessTokenScope::User(_) => { + if station_ids.is_empty() { + station_ids + } else { + for id in station_ids.iter() { + access_token_scope.grant_station_scope(id).await?; + } + station_ids + } + } + }; + + // let os = match os { + // None => None, + // Some(null) if null == "null" => Some(None), + // Some(os) => Some(Some(os)), + // }; + + // let browser = match browser { + // None => None, + // Some(null) if null == "null" => Some(None), + // Some(browser) => Some(Some(browser)), + // }; + + // let domain = match domain { + // None => None, + // Some(null) if null == "null" => Some(None), + // Some(domain) => Some(Some(domain)), + // }; + + let app_kind = match app_kind { + None => None, + Some(null) if null == "null" => Some(None), + Some(app_kind) => Some(Some(app_kind)), + }; + + let app_version = match app_version { + None => None, + Some(null) if null == "null" => Some(None), + Some(s) => match s.parse::() { + Ok(v) => Some(Some(v)), + Err(_) => None, + }, + }; + + let country_code = match country_code { + None => None, + Some(CountryCodeOrZZ::ZZ(_)) => Some(None), + Some(CountryCodeOrZZ::CC(cc)) => Some(Some(cc)), + }; + + let query = app_analytics::AnalyticsQuery { + kind, + station_ids, + country_code, + // os, + // browser, + // domain, + app_kind, + app_version, + min_duration_ms: Some(min_duration_ms.unwrap_or(5_000)), + }; + + let analytics = app_analytics::get_analytics(query).await?; + + let out = Output { analytics }; + + Ok(out) + } + } +} diff --git a/rs/packages/api/src/routes/mod.rs b/rs/packages/api/src/routes/mod.rs index 2f1a97de..b112d308 100644 --- a/rs/packages/api/src/routes/mod.rs +++ b/rs/packages/api/src/routes/mod.rs @@ -7,6 +7,7 @@ pub mod stations; pub mod users; pub mod analytics; +pub mod app_analytics; pub mod invitations; pub mod payment_methods; pub mod plans; @@ -140,6 +141,10 @@ pub fn router( .at("/analytics") .get(analytics::get::Endpoint {}.into_handler()); + app + .at("/app-analytics") + .get(app_analytics::get::Endpoint {}.into_handler()); + app.at("/stream-stats").get( stream_stats::get::Endpoint { index: stream_connections_index.clone(), diff --git a/rs/packages/api/src/ws_stats/routes/connection.rs b/rs/packages/api/src/ws_stats/routes/connection.rs index c901f452..899121a4 100644 --- a/rs/packages/api/src/ws_stats/routes/connection.rs +++ b/rs/packages/api/src/ws_stats/routes/connection.rs @@ -76,7 +76,8 @@ pub struct Query { #[ts(optional)] #[serde(skip_serializing_if = "Option::is_none")] - app_version: Option, + #[serde(with = "serde_util::as_f64::option")] + app_version: Option, } #[derive(Debug, Clone, Serialize, Deserialize, TS)] diff --git a/rs/packages/db/src/models/stream_connection/app_analytics/mod.rs b/rs/packages/db/src/models/stream_connection/app_analytics/mod.rs new file mode 100644 index 00000000..70f8f9f7 --- /dev/null +++ b/rs/packages/db/src/models/stream_connection/app_analytics/mod.rs @@ -0,0 +1,857 @@ +use std::{ + collections::{hash_map::Entry, HashMap, HashSet}, + hash::Hash, + net::IpAddr, + time::Instant, +}; + +use futures_util::{StreamExt, TryStreamExt}; +use geoip::CountryCode; +use mongodb::bson::doc; +use serde::{Deserialize, Serialize}; +use serde_util::DateTime; +use time::{OffsetDateTime, UtcOffset}; +use ts_rs::TS; + +use crate::{station::Station, ws_stats_connection::WsStatsConnection, Model}; + +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[ts(export, export_to = "../../../defs/app-analytics/")] +pub struct Analytics { + pub is_now: bool, + + pub kind: AnalyticsQueryKind, + + pub stations: Vec, + + #[ts(type = "/** time::DateTime */ string")] + #[serde(with = "time::serde::iso8601")] + pub since: time::OffsetDateTime, + + #[ts(type = "/** time::DateTime */ string")] + #[serde(with = "time::serde::iso8601")] + pub until: time::OffsetDateTime, + + pub utc_offset_minutes: i16, + + #[serde(with = "serde_util::as_f64")] + pub sessions: u64, + + #[serde(with = "serde_util::as_f64")] + pub ips: u64, + + #[serde(with = "serde_util::as_f64")] + pub total_duration_ms: u64, + + // #[serde(with = "serde_util::as_f64")] + // pub total_transfer_bytes: u64, + #[cfg(feature = "analytics-max-concurrent")] + #[serde(with = "serde_util::as_f64")] + #[ts(optional)] + pub max_concurrent_listeners: u64, + + #[cfg(feature = "analytics-max-concurrent")] + #[ts(optional)] + pub max_concurrent_listeners_date: Option, + + pub by_day: Vec>, + pub by_hour: Option>>, + pub by_country: Vec>>, + pub by_station: Vec>, + // pub by_browser: Vec>>, + // pub by_domain: Vec>>, + // pub by_os: Vec>>, + pub by_app_kind: Vec>>, + pub by_app_version: Vec>, +} + +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[ts(export, export_to = "../../../defs/app-analytics/")] +pub struct AnalyticsItem { + pub key: K, + #[serde(with = "serde_util::as_f64")] + pub sessions: u64, + #[serde(with = "serde_util::as_f64")] + pub ips: u64, + #[serde(with = "serde_util::as_f64")] + pub total_duration_ms: u64, + #[serde(with = "serde_util::as_f64")] + pub total_transfer_bytes: u64, + + #[cfg(feature = "analytics-max-concurrent")] + #[serde(with = "serde_util::as_f64")] + #[ts(optional)] + pub max_concurrent_listeners: u64, + + #[cfg(feature = "analytics-max-concurrent")] + #[ts(optional)] + pub max_concurrent_listeners_date: Option, +} + +// #[derive(Debug, Clone, Copy, Serialize, Eq, PartialEq, Hash, Ord, PartialOrd, Deserialize, TS)] +// #[ts(export, export_to = "../../../defs/app-analytics/")] +// pub struct YearMonth { +// pub year: u16, +// pub month: u8, +// } + +#[derive(Debug, Clone, Copy, Serialize, Eq, PartialEq, Hash, Ord, PartialOrd, Deserialize, TS)] +#[ts(export, export_to = "../../../defs/app-analytics/")] +pub struct YearMonthDay { + pub year: u16, + pub month: u8, + pub day: u8, +} + +#[derive(Debug, Clone, Copy, Serialize, Eq, PartialEq, Hash, Ord, PartialOrd, Deserialize, TS)] +#[ts(export, export_to = "../../../defs/app-analytics/")] +pub struct YearMonthDayHour { + pub year: u16, + pub month: u8, + pub day: u8, + pub hour: u8, +} + +#[derive(Debug, Clone, Serialize, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, TS)] +#[ts(export, export_to = "../../../defs/app-analytics/")] +pub struct AppKindVersion { + pub kind: Option, + pub version: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[ts(export, export_to = "../../../defs/app-analytics/")] +#[macros::keys] +pub struct AnalyticsStation { + #[serde(rename = "_id")] + pub id: String, + pub name: String, + pub created_at: serde_util::DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AnalyticsQuery { + pub kind: AnalyticsQueryKind, + pub station_ids: Vec, + pub country_code: Option>, + // pub browser: Option>, + // pub os: Option>, + // pub domain: Option>, + pub app_kind: Option>, + pub app_version: Option>, + pub min_duration_ms: Option, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, ts_rs::TS)] +#[ts(export, export_to = "../../../defs/app-analytics/")] +pub enum AnalyticsQueryKind { + #[serde(rename = "now")] + Now { + #[ts(type = "/** time::DateTime */ string")] + #[serde(with = "time::serde::iso8601")] + offset_date: time::OffsetDateTime, + }, + + #[serde(rename = "time_range")] + TimeRange { + #[ts(type = "/** time::DateTime */ string")] + #[serde(with = "time::serde::iso8601")] + since: time::OffsetDateTime, + + #[ts(type = "/** time::DateTime */ string")] + #[serde(with = "time::serde::iso8601")] + until: time::OffsetDateTime, + }, +} + +type KeyedAccumulatorMap = HashMap; + +#[derive(Debug, Default)] +struct AccumulatorItem { + sessions: u64, + ips: HashSet, + total_duration_ms: u64, + total_transfer_bytes: u64, + #[cfg(feature = "analytics-max-concurrent")] + start_stop_events: Vec, +} + +impl AccumulatorItem { + #[inline(always)] + #[allow(unused)] + pub fn new() -> Self { + Self::default() + } + + #[inline(always)] + fn merge(&mut self, dst: Self) { + self.sessions += dst.sessions; + self.ips.extend(dst.ips); + self.total_duration_ms += dst.total_duration_ms; + self.total_transfer_bytes += dst.total_transfer_bytes; + + #[cfg(feature = "analytics-max-concurrent")] + self.start_stop_events.extend(dst.start_stop_events); + } +} + +#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Hash)] +struct StartStopEvent(u32); + +impl StartStopEvent { + const START: u32 = 0b_1111_1111_1111_1111_1111_1111_1111_1110; + const STOP: u32 = 0b_0000_0000_0000_0000_0000_0000_0000_0001; + + #[inline(always)] + pub fn new(timestamp: u32, start_stop: bool) -> Self { + let v = if start_stop { + timestamp & Self::START + } else { + timestamp | Self::STOP + }; + + Self(v) + } + + #[inline(always)] + pub fn is_start(self) -> bool { + self.0 & 1 == 0 + } +} + +#[inline(always)] +fn merge_accumulator_maps( + src: &mut KeyedAccumulatorMap, + dst: KeyedAccumulatorMap, +) { + for (key, value) in dst.into_iter() { + match src.entry(key) { + Entry::Vacant(entry) => { + entry.insert(value); + } + Entry::Occupied(mut entry) => { + entry.get_mut().merge(value); + } + } + } +} + +#[derive(Debug)] +struct Batch { + pub offset: UtcOffset, + pub now_ms: u64, + + pub sessions: u64, + pub total_duration_ms: u64, + pub total_transfer_bytes: u64, + pub ips: HashSet, + pub by_day: KeyedAccumulatorMap, + pub by_hour: KeyedAccumulatorMap, + pub by_app_kind: KeyedAccumulatorMap>, + pub by_app_version: KeyedAccumulatorMap, + // pub by_browser: KeyedAccumulatorMap>, + // pub by_os: KeyedAccumulatorMap>, + // pub by_domain: KeyedAccumulatorMap>, + pub by_country: KeyedAccumulatorMap>, + pub by_station: KeyedAccumulatorMap, + #[cfg(feature = "analytics-max-concurrent")] + pub start_stop_events: Vec, +} + +impl Batch { + #[inline(always)] + pub fn new(offset: UtcOffset) -> Self { + Self { + offset, + now_ms: OffsetDateTime::now_utc().unix_timestamp() as u64 * 1000, + + sessions: 0, + total_duration_ms: 0, + total_transfer_bytes: 0, + ips: Default::default(), + + by_day: Default::default(), + by_hour: Default::default(), + by_country: Default::default(), + by_station: Default::default(), + // by_browser: Default::default(), + // by_os: Default::default(), + // by_domain: Default::default(), + by_app_kind: Default::default(), + by_app_version: Default::default(), + #[cfg(feature = "analytics-max-concurrent")] + start_stop_events: Default::default(), + } + } + + #[inline(always)] + pub fn add(&mut self, conn: WsStatsConnection) { + let created_at = conn.created_at.to_offset(self.offset); + + let conn_duration_ms = conn + .duration_ms + .unwrap_or_else(|| (self.now_ms - created_at.unix_timestamp() as u64 * 1000)); + + // let conn_transfer_bytes = conn.transfer_bytes.unwrap_or(0); + let conn_year = created_at.year() as u16; + let conn_month = created_at.month() as u8; + let conn_day = created_at.day(); + let conn_hour = created_at.hour(); + // let conn_browser = conn.browser; + // let conn_os = conn.os; + let conn_kind_version = AppKindVersion { + kind: conn.app_kind.clone(), + version: conn.app_version, + }; + + self.sessions += 1; + self.total_duration_ms += conn_duration_ms; + // self.total_transfer_bytes += conn_transfer_bytes; + self.ips.insert(conn.ip); + + #[cfg(feature = "analytics-max-concurrent")] + let start_s = conn.created_at.unix_timestamp() as u32; + + #[cfg(feature = "analytics-max-concurrent")] + let start = StartStopEvent::new(start_s, true); + #[cfg(feature = "analytics-max-concurrent")] + self.start_stop_events.push(start); + #[cfg(feature = "analytics-max-concurrent")] + let stop = StartStopEvent::new(start_s + (conn_duration_ms / 1000) as u32, false); + + if !conn.is_open { + #[cfg(feature = "analytics-max-concurrent")] + self.start_stop_events.push(stop); + } + + macro_rules! add { + ($acc:expr, $key:expr) => { + let item = $acc.entry($key).or_default(); + item.sessions += 1; + item.ips.insert(conn.ip); + item.total_duration_ms += conn_duration_ms; + // item.total_transfer_bytes += conn_transfer_bytes; + + #[cfg(feature = "analytics-max-concurrent")] + item.start_stop_events.push(start); + + #[cfg(feature = "analytics-max-concurrent")] + if !conn.is_open { + item.start_stop_events.push(stop); + } + }; + } + + add!( + self.by_day, + YearMonthDay { + year: conn_year, + month: conn_month, + day: conn_day + } + ); + + add!( + self.by_hour, + YearMonthDayHour { + year: conn_year, + month: conn_month, + day: conn_day, + hour: conn_hour, + } + ); + + add!(self.by_country, conn.country_code); + add!(self.by_station, conn.station_id); + // add!(self.by_browser, conn_browser); + // add!(self.by_os, conn_os); + // add!(self.by_domain, conn.domain); + add!(self.by_app_kind, conn.app_kind); + add!(self.by_app_version, conn_kind_version); + } + + #[inline(always)] + pub fn merge(&mut self, dst: Self) { + self.sessions += dst.sessions; + self.total_duration_ms += dst.total_duration_ms; + self.total_transfer_bytes += dst.total_transfer_bytes; + self.ips.extend(dst.ips); + merge_accumulator_maps(&mut self.by_day, dst.by_day); + merge_accumulator_maps(&mut self.by_hour, dst.by_hour); + merge_accumulator_maps(&mut self.by_country, dst.by_country); + merge_accumulator_maps(&mut self.by_station, dst.by_station); + // merge_accumulator_maps(&mut self.by_browser, dst.by_browser); + // merge_accumulator_maps(&mut self.by_os, dst.by_os); + // merge_accumulator_maps(&mut self.by_domain, dst.by_domain); + merge_accumulator_maps(&mut self.by_app_kind, dst.by_app_kind); + merge_accumulator_maps(&mut self.by_app_version, dst.by_app_version); + + #[cfg(feature = "analytics-max-concurrent")] + self.start_stop_events.extend(dst.start_stop_events); + } +} + +pub async fn get_analytics(query: AnalyticsQuery) -> Result { + tokio::task::spawn_blocking(move || { + tokio::runtime::Handle::current().block_on(async move { + let start = Instant::now(); + + let stations = { + let filter = doc! { + Station::KEY_ID: { + "$in": &query.station_ids, + } + }; + + let projection = doc! { + Station::KEY_ID: 1, + Station::KEY_NAME: 1, + Station::KEY_CREATED_AT: 1, + }; + + let sort = doc! { + Station::KEY_CREATED_AT: 1, + }; + + let options = mongodb::options::FindOptions::builder() + .projection(projection) + .sort(sort) + .build(); + + let stations: Vec = Station::cl_as::() + .find(filter, options) + .await? + .try_collect() + .await?; + + stations + }; + + let mut and = vec![doc! { + WsStatsConnection::KEY_STATION_ID: { + "$in": &query.station_ids, + } + }]; + + let offset_date: OffsetDateTime; + let kind = query.kind; + let is_now: bool; + let with_hours: bool; + + let mut start_end_date: Option<(OffsetDateTime, OffsetDateTime)> = None; + + match kind { + AnalyticsQueryKind::Now { offset_date: d } => { + is_now = true; + with_hours = false; + offset_date = OffsetDateTime::now_utc().to_offset(d.offset()); + + and.push(doc! { WsStatsConnection::KEY_IS_OPEN: true }); + } + + AnalyticsQueryKind::TimeRange { since, until } => { + is_now = false; + let mut start_date = since; + let mut end_date = (until).to_offset(since.offset()); + + let now = OffsetDateTime::now_utc(); + let first_station_created_at = stations + .first() + .map(|station| *station.created_at) + .unwrap_or_else(OffsetDateTime::now_utc); + + if start_date < first_station_created_at { + start_date = first_station_created_at.to_offset(start_date.offset()); + } + + if end_date < first_station_created_at { + end_date = first_station_created_at.to_offset(start_date.offset()); + } + + if end_date > now { + end_date = now.to_offset(end_date.offset()); + } + + if start_date > now { + start_date = now.to_offset(start_date.offset()); + } + + if start_date > end_date { + (start_date, end_date) = (end_date, start_date); + } + + with_hours = + (end_date.unix_timestamp() - start_date.unix_timestamp()) < (60 * 60 * 24 * 32); + + // let ser_start_date: serde_util::DateTime = start_date.into(); + // let ser_end_date: serde_util::DateTime = end_date.into(); + + // and.push(doc! { + // WsStatsConnection::KEY_CREATED_AT: { + // "$gte": ser_start_date, + // "$lt": ser_end_date, + // } + // }); + + start_end_date = Some((start_date, end_date)); + + offset_date = start_date; + } + } + + if let Some(cc) = query.country_code { + and.push(doc! { + // this convertion should never fail + WsStatsConnection::KEY_COUNTRY_CODE: mongodb::bson::to_bson(&cc).unwrap(), + }); + } + + // if let Some(os) = query.os { + // and.push(doc! { WsStatsConnection::KEY_OS: os }); + // } + + // if let Some(browser) = query.browser { + // and.push(doc! { WsStatsConnection::KEY_BROWSER: browser }); + // } + + // if let Some(domain) = query.domain { + // and.push(doc! { WsStatsConnection::KEY_DOMAIN: domain }); + // } + + if let Some(app_kind) = query.app_kind { + and.push(doc! { WsStatsConnection::KEY_APP_KIND: app_kind }); + } + + if let Some(app_version) = query.app_version { + and.push(doc! { WsStatsConnection::KEY_APP_VERSION: app_version }); + } + + if let Some(d) = query.min_duration_ms { + and.push(doc! { + "$or": [ + { WsStatsConnection::KEY_DURATION_MS: null }, + { WsStatsConnection::KEY_DURATION_MS: { "$gte": d as f64 } }, + ] + }); + } + + // let (count, count_ms) = { + // let start = Instant::now(); + // let filter = doc! { "$and": &and }; + // let count = WsStatsConnection::cl() + // .count_documents(filter, None) + // .await?; + + // let ms = start.elapsed().as_millis(); + // (count, ms) + // }; + + // let filter = doc!{ "$and": and }; + // let mut cursor = WsStatsConnection::cl().find(filter, options).await?; + + let mut first_last_and = and.clone(); + if let Some((start_date, end_date)) = start_end_date { + let start_date: DateTime = start_date.into(); + let end_date: DateTime = end_date.into(); + first_last_and.push( + doc! { WsStatsConnection::KEY_CREATED_AT: { "$gte": start_date, "$lt": end_date } }, + ); + } + + let get_bound = |direction: i32| { + let sort = doc! { WsStatsConnection::KEY_CREATED_AT: direction }; + async { + let filter = doc! { "$and": &first_last_and }; + let options = mongodb::options::FindOneOptions::builder() + .sort(sort) + .build(); + let item_option = WsStatsConnection::cl().find_one(filter, options).await?; + Ok::<_, mongodb::error::Error>(item_option) + } + }; + + let (first, last) = match tokio::try_join!(get_bound(1), get_bound(-1))? { + (Some(first), Some(last)) => (first, last), + _ => { + return Ok(Analytics { + is_now, + kind, + stations, + since: offset_date, + until: offset_date, + utc_offset_minutes: offset_date.offset().whole_minutes(), + sessions: 0, + ips: 0, + total_duration_ms: 0, + // total_transfer_bytes: 0, + #[cfg(feature = "analytics-max-concurrent")] + max_concurrent_listeners: 0, + #[cfg(feature = "analytics-max-concurrent")] + max_concurrent_listeners_date: None, + by_day: vec![], + by_hour: None, + by_country: vec![], + by_station: vec![], + // by_browser: vec![], + // by_os: vec![], + // by_domain: vec![], + by_app_kind: vec![], + by_app_version: vec![], + }); + } + }; + + let first_ms = first.created_at.unix_timestamp_nanos() as u64 / 1_000_000; + let last_ms = last.created_at.unix_timestamp_nanos() as u64 / 1_000_000; + + let accumulate_start = Instant::now(); + + let batches_n = if is_now { 1 } else { 16 }; + let step = (last_ms - first_ms) / batches_n as u64; + let batch = { + futures_util::stream::repeat(()) + .take(batches_n) + .enumerate() + .map(|(i, ())| { + let mut and = and.clone(); + async move { + tokio::task::spawn_blocking(move || { + tokio::runtime::Handle::current().block_on(async move { + if !is_now { + // let step = time::Duration::milliseconds(step as i64); + // let start: DateTime = (*first_ts + step * i as u16).into(); + // let end: DateTime = (*start + step).into(); + let start_ms = first_ms + step * i as u64; + let end_ms = first_ms + step * (i + 1) as u64; + + let start = mongodb::bson::DateTime::from_millis(start_ms as i64); + let mut end = mongodb::bson::DateTime::from_millis(end_ms as i64); + + let is_last = (i + 1) == batches_n; + + let mut lt = "$lt"; + if is_last { + lt = "$lte"; + end = mongodb::bson::DateTime::from_millis(last_ms as i64); + } + + and + .push(doc! { WsStatsConnection::KEY_CREATED_AT: { "$gte": start, lt: end } }); + } + + let sort = doc! { WsStatsConnection::KEY_CREATED_AT: 1 }; + + // let options = mongodb::options::AggregateOptions::builder().build(); + + // let pipeline = vec![ + // doc! { "$match": { "$and": and } }, + // //doc! { "$sample": { "size": 100_000 } }, + // doc! { "$sort": sort }, + // ]; + + // let mut cursor = WsStatsConnection::cl() + // .find(pipeline, options) + // .await? + // .with_type::(); + + let options = mongodb::options::FindOptions::builder().sort(sort).build(); + + let filter = doc! { "$and": and }; + let mut cursor = WsStatsConnection::cl().find(filter, options).await?; + + let mut batch = Batch::new(offset_date.offset()); + + // accumulate + #[cfg(feature = "test-analytics-base-measure")] + while let Some(_conn) = cursor.try_next().await? { + batch.sessions += 1; + } + + #[cfg(not(feature = "test-analytics-base-measure"))] + while let Some(conn) = cursor.try_next().await? { + batch.add(conn); + } + + Ok::<_, mongodb::error::Error>(batch) + }) + }) + .await + .unwrap() + } + }) + .buffer_unordered(batches_n) + .try_fold(Batch::new(offset_date.offset()), |src, mut dst| async { + dst.merge(src); + Ok(dst) + }) + .await? + }; + + let accumulate_ms = accumulate_start.elapsed().as_millis(); + let sort_start = Instant::now(); + + #[cfg(feature = "analytics-max-concurrent")] + macro_rules! max_concurrent { + ($vec:expr) => {{ + use rayon::prelude::*; + + let mut vec = $vec; + //vec.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + //vec.par_sort_unstable_by(|a, b| a.0.cmp(&b.0).then(b.1.cmp(&a.1))); + vec.par_sort_unstable(); + + let mut max: u32 = 0; + let mut max_timestamp: u32 = 0; + let mut current: u32 = 0; + for event in vec.into_iter() { + if event.is_start() { + current = current.saturating_add(1); + if current > max { + max = current; + max_timestamp = event.0 + } + } else { + current = current.saturating_sub(1); + } + } + + let max_concurrent_listeners_date = if max_timestamp == 0 { + None + } else { + time::OffsetDateTime::from_unix_timestamp(max_timestamp as i64) + .ok() + .map(serde_util::DateTime::from) + }; + + (max as u64, max_concurrent_listeners_date) + }}; + } + + macro_rules! collect { + ($acc:expr) => { + $acc + .into_iter() + .map(|(key, value)| { + #[cfg(feature = "analytics-max-concurrent")] + let (max_concurrent_listeners, max_concurrent_listeners_date) = + max_concurrent!(value.start_stop_events); + + AnalyticsItem::<_> { + key, + sessions: value.sessions, + ips: value.ips.len() as u64, + total_duration_ms: value.total_duration_ms, + total_transfer_bytes: value.total_transfer_bytes, + #[cfg(feature = "analytics-max-concurrent")] + max_concurrent_listeners, + #[cfg(feature = "analytics-max-concurrent")] + max_concurrent_listeners_date, + } + }) + .collect::>() + }; + } + + // collect + // let mut by_month = collect!(months_accumulator); + let mut by_day = collect!(batch.by_day); + let mut by_hour = collect!(batch.by_hour); + let mut by_country = collect!(batch.by_country); + let mut by_station = collect!(batch.by_station); + // let mut by_browser = collect!(batch.by_browser); + // let mut by_os = collect!(batch.by_os); + // let mut by_domain = collect!(batch.by_domain); + let mut by_app_kind = collect!(batch.by_app_kind); + let mut by_app_version = collect!(batch.by_app_version); + + // sort + macro_rules! sort_by_key { + ($ident:ident) => { + $ident.sort_by(|a, b| a.key.cmp(&b.key)); + }; + } + + macro_rules! sort_by_sessions { + ($ident:ident) => { + $ident.sort_by(|a, b| b.sessions.cmp(&a.sessions)); + }; + } + + // sort_by_key!(by_month); + sort_by_key!(by_day); + sort_by_key!(by_hour); + + sort_by_sessions!(by_country); + sort_by_sessions!(by_station); + // sort_by_sessions!(by_browser); + // sort_by_sessions!(by_os); + // sort_by_sessions!(by_domain); + sort_by_sessions!(by_app_kind); + sort_by_sessions!(by_app_version); + + #[cfg(feature = "analytics-max-concurrent")] + let (max_concurrent_listeners, max_concurrent_listeners_date) = + max_concurrent!(batch.start_stop_events); + + let sort_ms = sort_start.elapsed().as_millis(); + + let since = first.created_at.to_offset(offset_date.offset()); + let until = last.created_at.to_offset(offset_date.offset()); + + log::info!( + target: "analytics", + "got analytics, processed {} connections in {}ms => {}ms acculumate, {}ms sort", + batch.sessions, + start.elapsed().as_millis(), + accumulate_ms, + sort_ms, + ); + + let by_hour = if with_hours { Some(by_hour) } else { None }; + + // render + let out = Analytics { + is_now, + kind, + since, + until, + utc_offset_minutes: offset_date.offset().whole_minutes(), + sessions: batch.sessions, + total_duration_ms: batch.total_duration_ms, + // total_transfer_bytes: batch.total_transfer_bytes, + ips: batch.ips.len() as u64, + #[cfg(feature = "analytics-max-concurrent")] + max_concurrent_listeners, + #[cfg(feature = "analytics-max-concurrent")] + max_concurrent_listeners_date, + stations, + by_day, + by_hour, + by_country, + by_station, + // by_browser, + // by_os, + // by_domain, + by_app_kind, + by_app_version, + }; + + Ok(out) + }) + }) + .await + .unwrap() +} + +#[cfg(test)] +pub mod test { + + use super::*; + + #[test] + fn analytics_station_and_db_station_have_the_same_key_names() { + assert_eq!(AnalyticsStation::KEY_ID, Station::KEY_ID); + assert_eq!(AnalyticsStation::KEY_NAME, Station::KEY_NAME); + assert_eq!(AnalyticsStation::KEY_CREATED_AT, Station::KEY_CREATED_AT); + } +} diff --git a/rs/packages/db/src/models/stream_connection/mod.rs b/rs/packages/db/src/models/stream_connection/mod.rs index a5c0dc2b..58928fc9 100644 --- a/rs/packages/db/src/models/stream_connection/mod.rs +++ b/rs/packages/db/src/models/stream_connection/mod.rs @@ -8,6 +8,7 @@ use std::net::IpAddr; use ts_rs::TS; pub mod analytics; +pub mod app_analytics; pub mod index; pub mod lite; pub mod stats; diff --git a/rs/packages/db/src/models/ws_stats_connection/mod.rs b/rs/packages/db/src/models/ws_stats_connection/mod.rs index cd40f6eb..5c63f421 100644 --- a/rs/packages/db/src/models/ws_stats_connection/mod.rs +++ b/rs/packages/db/src/models/ws_stats_connection/mod.rs @@ -42,7 +42,8 @@ pub struct WsStatsConnection { pub app_kind: Option, #[serde(rename = "av")] - pub app_version: Option, + #[serde(with = "serde_util::as_f64::option")] + pub app_version: Option, #[serde(rename = "re")] #[serde(with = "serde_util::as_f64")] @@ -73,15 +74,15 @@ impl Model for WsStatsConnection { .keys(doc! { Self::KEY_CREATED_AT: 1 }) .build(); - let created_at_station_id = IndexModel::builder() - .keys(doc! { Self::KEY_CREATED_AT: 1, Self::KEY_STATION_ID: 1 }) + let station_id_created_at = IndexModel::builder() + .keys(doc! { Self::KEY_STATION_ID: 1, Self::KEY_CREATED_AT: 1 }) .build(); let is_open = IndexModel::builder() .keys(doc! { Self::KEY_IS_OPEN: 1 }) .build(); - vec![station_id, created_at, created_at_station_id, is_open] + vec![station_id, created_at, station_id_created_at, is_open] } }