diff --git a/stats/.dockerignore b/stats/.dockerignore index ff66cc1f0..da63d323e 100644 --- a/stats/.dockerignore +++ b/stats/.dockerignore @@ -3,3 +3,4 @@ Dockerfile README.md tests config.toml +data diff --git a/stats/.gitignore b/stats/.gitignore new file mode 100644 index 000000000..8fce60300 --- /dev/null +++ b/stats/.gitignore @@ -0,0 +1 @@ +data/ diff --git a/stats/Cargo.lock b/stats/Cargo.lock index 7eb3dda79..b25dafebc 100644 --- a/stats/Cargo.lock +++ b/stats/Cargo.lock @@ -3469,6 +3469,7 @@ dependencies = [ "prost", "prost-build", "serde", + "serde_json", "serde_with", "tonic", "tonic-build", diff --git a/stats/docker-compose.dev.yml b/stats/docker-compose.dev.yml new file mode 100644 index 000000000..4451cb109 --- /dev/null +++ b/stats/docker-compose.dev.yml @@ -0,0 +1,122 @@ +version: '3.9' + +services: + db-init: + image: postgres:15 + volumes: + - ./data/blockscout-db:/var/lib/postgresql/data + entrypoint: + - sh + - -c + - | + chown -R 2000:2000 /var/lib/postgresql/data + + db: + depends_on: + db-init: + condition: service_completed_successfully + image: postgres:15 + user: 2000:2000 + shm_size: 256m + restart: always + container_name: 'db' + command: postgres -c 'max_connections=200' -c 'client_connection_check_interval=60000' + environment: + POSTGRES_DB: 'blockscout' + POSTGRES_USER: 'blockscout' + POSTGRES_PASSWORD: 'ceWb1MeLBEeOIfk65gU8EjF8' + ports: + - target: 5432 + published: 7432 + volumes: + - ./data/blockscout-db:/var/lib/postgresql/data + healthcheck: + test: [ "CMD-SHELL", "pg_isready -U blockscout -d blockscout" ] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + + backend: + depends_on: + - db + image: blockscout/blockscout:6.4.0 + links: + - db:database + # extra_hosts: + # - 'host.docker.internal:host-gateway' + environment: + DATABASE_URL: postgresql://blockscout:ceWb1MeLBEeOIfk65gU8EjF8@db:5432/blockscout + ETHEREUM_JSONRPC_VARIANT: erigon + ETHEREUM_JSONRPC_HTTP_URL: http://host.docker.internal:8545/ + ETHEREUM_JSONRPC_TRACE_URL: http://host.docker.internal:8545/ + FIRST_BLOCK: 5660029 + ECTO_USE_SSL: false + PORT: 4000 + ports: + - 80:4000 + command: + [ + "/bin/sh", + "-c", + "bin/blockscout eval \"Elixir.Explorer.ReleaseTasks.create_and_migrate()\" && bin/blockscout start" + ] + + stats-db-init: + image: postgres:15 + volumes: + - ./data/stats-db:/var/lib/postgresql/data + entrypoint: + - sh + - -c + - | + chown -R 2000:2000 /var/lib/postgresql/data + + stats-db: + depends_on: + stats-db-init: + condition: service_completed_successfully + image: postgres:15 + user: 2000:2000 + shm_size: 256m + restart: always + container_name: 'stats-db' + command: postgres -c 'max_connections=200' + environment: + POSTGRES_DB: 'stats' + POSTGRES_USER: 'stats' + POSTGRES_PASSWORD: 'n0uejXPl61ci6ldCuE2gQU5Y' + ports: + - target: 5432 + published: 7433 + volumes: + - ./data/stats-db:/var/lib/postgresql/data + healthcheck: + test: [ "CMD-SHELL", "pg_isready -U stats -d stats" ] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + + stats: + depends_on: + - stats-db + - backend + build: . + pull_policy: always + platform: linux/amd64 + restart: always + container_name: 'stats' + # extra_hosts: + # - 'host.docker.internal:host-gateway' + environment: + - STATS__DB_URL=postgres://stats:n0uejXPl61ci6ldCuE2gQU5Y@stats-db:5432/stats + - STATS__BLOCKSCOUT_DB_URL=${STATS__BLOCKSCOUT_DB_URL:-postgresql://blockscout:ceWb1MeLBEeOIfk65gU8EjF8@db:5432/blockscout} + - STATS__CREATE_DATABASE=true + - STATS__RUN_MIGRATIONS=true + - STATS__SERVER__HTTP__ENABLED=true + - STATS__SERVER__HTTP__ADDR=0.0.0.0:8050 + - STATS__SERVER__HTTP__MAX_BODY_SIZE=2097152 + - RUST_BACKTRACE=1 + ports: + - 8080:8050 diff --git a/stats/justfile b/stats/justfile index 4d63486b8..e3971de3a 100644 --- a/stats/justfile +++ b/stats/justfile @@ -25,6 +25,8 @@ test *args: cargo test {{args}} -- --include-ignored test-with-db *args: + # remove db from previous run (if failed) + -just docker-name="{{docker-name}}-test" stop-postgres 2> /dev/null -just db-port="{{test-db-port}}" db-name="" docker-name="{{docker-name}}-test" start-postgres just db-port="{{test-db-port}}" db-name="" test {{args}} just docker-name="{{docker-name}}-test" stop-postgres diff --git a/stats/stats-proto/Cargo.toml b/stats/stats-proto/Cargo.toml index 768e0c958..9acc64adc 100644 --- a/stats/stats-proto/Cargo.toml +++ b/stats/stats-proto/Cargo.toml @@ -13,6 +13,9 @@ serde = { version = "1", features = ["derive"] } serde_with = { version = "2.0", features = ["hex", "base64"] } async-trait = "0.1" +[dev-dependencies] +serde_json = "1.0" + [build-dependencies] actix-prost-build = { git = "https://github.com/blockscout/actix-prost" } tonic-build = "0.8" diff --git a/stats/stats-proto/build.rs b/stats/stats-proto/build.rs index 1a1bbe580..a96d653a6 100644 --- a/stats/stats-proto/build.rs +++ b/stats/stats-proto/build.rs @@ -16,7 +16,9 @@ fn compile( .protoc_arg("--openapiv2_opt") .protoc_arg("grpc_api_configuration=proto/api_config_http.yaml,output_format=yaml,allow_merge=true,merge_file_name=stats") .bytes(["."]) - .type_attribute(".", "#[actix_prost_macros::serde]"); + .type_attribute(".", "#[actix_prost_macros::serde]") + .field_attribute(".blockscout.stats.v1.Point.is_approximate", "#[serde(skip_serializing_if = \"std::ops::Not::not\")]") + .field_attribute(".blockscout.stats.v1.Point.is_approximate", "#[serde(default)]"); config.compile_protos(protos, includes)?; Ok(()) diff --git a/stats/stats-proto/proto/stats.proto b/stats/stats-proto/proto/stats.proto index cf4e5c5de..eb2814996 100644 --- a/stats/stats-proto/proto/stats.proto +++ b/stats/stats-proto/proto/stats.proto @@ -34,6 +34,7 @@ message GetLineChartRequest { message Point { string date = 1; string value = 2; + bool is_approximate = 3; } message LineChart { repeated Point chart = 1; } @@ -53,4 +54,4 @@ message LineChartSection { repeated LineChartInfo charts = 3; } -message LineCharts { repeated LineChartSection sections = 1; } \ No newline at end of file +message LineCharts { repeated LineChartSection sections = 1; } diff --git a/stats/stats-proto/src/lib.rs b/stats/stats-proto/src/lib.rs index 9862263c3..0a085ab7f 100644 --- a/stats/stats-proto/src/lib.rs +++ b/stats/stats-proto/src/lib.rs @@ -6,3 +6,6 @@ pub mod blockscout { } } } + +#[cfg(test)] +mod tests; diff --git a/stats/stats-proto/src/tests.rs b/stats/stats-proto/src/tests.rs new file mode 100644 index 000000000..e65463e30 --- /dev/null +++ b/stats/stats-proto/src/tests.rs @@ -0,0 +1,39 @@ +use prost::Message; + +use crate::blockscout::stats::v1::{self as proto}; + +const PRECISE_POINT_1: &str = r#" +{ + "date": "2024-03-14", + "value": "188542399", + "isApproximate": false +} +"#; + +const PRECISE_POINT_2: &str = r#" +{ + "date": "2024-03-14", + "value": "188542399" +} +"#; + +#[test] +fn is_approximate_serialization() { + // deserialize + let point: proto::Point = serde_json::from_str(PRECISE_POINT_1).unwrap(); + assert!(!point.is_approximate); + let point: proto::Point = serde_json::from_str(PRECISE_POINT_2).unwrap(); + assert!(!point.is_approximate); + + // serialize + let point = proto::Point { + date: "2024-03-14".to_owned(), + value: "188542399".to_owned(), + is_approximate: false, + }; + let serialized_point = serde_json::to_string(&point).unwrap(); + assert_eq!( + serialized_point.replace([' ', '\n'], ""), + PRECISE_POINT_2.replace([' ', '\n'], "") + ); +} diff --git a/stats/stats-proto/swagger/stats.swagger.yaml b/stats/stats-proto/swagger/stats.swagger.yaml index 41b2a6786..6398e9a32 100644 --- a/stats/stats-proto/swagger/stats.swagger.yaml +++ b/stats/stats-proto/swagger/stats.swagger.yaml @@ -190,4 +190,6 @@ definitions: type: string value: type: string + isApproximate: + type: boolean title: All integers are encoded as strings to prevent data loss diff --git a/stats/stats-server/src/charts.rs b/stats/stats-server/src/charts.rs index 442792a4f..632b65d61 100644 --- a/stats/stats-server/src/charts.rs +++ b/stats/stats-server/src/charts.rs @@ -2,17 +2,17 @@ use crate::config::{ toml_config::{Config, LineChartSection}, ChartSettings, }; -use stats::{cache::Cache, counters, entity::sea_orm_active_enums::ChartType, lines, Chart}; +use stats::{cache::Cache, counters, entity::sea_orm_active_enums::ChartType, lines, ChartUpdater}; use std::{ collections::{BTreeMap, HashMap, HashSet}, hash::Hash, sync::Arc, }; -pub type ArcChart = Arc; +pub type ArcChartUpdater = Arc; pub struct ChartInfo { - pub chart: ArcChart, + pub chart: ArcChartUpdater, pub settings: ChartSettings, } @@ -131,7 +131,7 @@ impl Charts { .collect() } - fn all_charts() -> Vec { + fn all_charts() -> Vec { let accounts_cache = Cache::default(); let new_txns = Arc::new(lines::NewTxns::default()); let new_native_coin_transfers = Arc::new(lines::NewNativeCoinTransfers::default()); diff --git a/stats/stats-server/src/read_service.rs b/stats/stats-server/src/read_service.rs index 4c0b89628..5be90e707 100644 --- a/stats/stats-server/src/read_service.rs +++ b/stats/stats-server/src/read_service.rs @@ -1,6 +1,6 @@ -use crate::{charts::Charts, serializers::serialize_line_points}; +use crate::{charts::Charts, serializers::serialize_line_points, settings::LimitsSettings}; use async_trait::async_trait; -use chrono::NaiveDate; +use chrono::{Duration, NaiveDate, Utc}; use sea_orm::{DatabaseConnection, DbErr}; use stats::ReadError; use stats_proto::blockscout::stats::v1::{ @@ -14,17 +14,37 @@ use tonic::{Request, Response, Status}; pub struct ReadService { db: Arc, charts: Arc, + limits: ReadLimits, } impl ReadService { - pub async fn new(db: Arc, charts: Arc) -> Result { - Ok(Self { db, charts }) + pub async fn new( + db: Arc, + charts: Arc, + limits: ReadLimits, + ) -> Result { + Ok(Self { db, charts, limits }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ReadLimits { + /// See [`LimitsSettings::request_interval_limit_days`] + pub request_interval_limit: Duration, +} + +impl From for ReadLimits { + fn from(value: LimitsSettings) -> Self { + Self { + request_interval_limit: Duration::days(value.request_interval_limit_days.into()), + } } } fn map_read_error(err: ReadError) -> Status { match &err { ReadError::NotFound(_) => Status::not_found(err.to_string()), + ReadError::IntervalLimitExceeded(_) => Status::invalid_argument(err.to_string()), _ => { tracing::error!(err = ?err, "internal read error"); Status::internal(err.to_string()) @@ -56,7 +76,7 @@ impl StatsService for ReadService { .filter_map(|(counter, info)| { data.remove(&counter.id).map(|point| { let point: stats::DateValue = if info.chart.relevant_or_zero() { - point.relevant_or_zero() + point.relevant_or_zero(Utc::now().date_naive()) } else { point }; @@ -90,18 +110,20 @@ impl StatsService for ReadService { .and_then(|date| NaiveDate::from_str(&date).ok()); let to = request.to.and_then(|date| NaiveDate::from_str(&date).ok()); let policy = Some(chart_info.chart.missing_date_policy()); - let mut data = stats::get_chart_data(&self.db, &request.name, from, to, policy) - .await - .map_err(map_read_error)?; + let mark_approx = chart_info.chart.approximate_trailing_points(); + let interval_limit = Some(self.limits.request_interval_limit); + let data = stats::get_chart_data( + &self.db, + &request.name, + from, + to, + interval_limit, + policy, + mark_approx, + ) + .await + .map_err(map_read_error)?; - if chart_info.chart.drop_last_point() { - // remove last data point, because it can be partially updated - if let Some(last) = data.last() { - if last.is_partial() { - data.pop(); - } - } - } let serialized_chart = serialize_line_points(data); Ok(Response::new(LineChart { chart: serialized_chart, diff --git a/stats/stats-server/src/serializers.rs b/stats/stats-server/src/serializers.rs index c5ee9eb6e..9e3e00757 100644 --- a/stats/stats-server/src/serializers.rs +++ b/stats/stats-server/src/serializers.rs @@ -1,11 +1,12 @@ -use stats::DateValue; +use stats::ExtendedDateValue; use stats_proto::blockscout::stats::v1::Point; -pub fn serialize_line_points(data: Vec) -> Vec { +pub fn serialize_line_points(data: Vec) -> Vec { data.into_iter() .map(|point| Point { date: point.date.to_string(), value: point.value, + is_approximate: point.is_approximate, }) .collect() } diff --git a/stats/stats-server/src/server.rs b/stats/stats-server/src/server.rs index 9a8ee64e1..936512043 100644 --- a/stats/stats-server/src/server.rs +++ b/stats/stats-server/src/server.rs @@ -78,7 +78,7 @@ pub async fn stats(settings: Settings) -> Result<(), anyhow::Error> { .await; }); - let read_service = Arc::new(ReadService::new(db, charts).await?); + let read_service = Arc::new(ReadService::new(db, charts, settings.limits.into()).await?); let health = Arc::new(HealthService::default()); let grpc_router = grpc_router(read_service.clone(), health.clone()); diff --git a/stats/stats-server/src/settings.rs b/stats/stats-server/src/settings.rs index 448030795..e02e2351d 100644 --- a/stats/stats-server/src/settings.rs +++ b/stats/stats-server/src/settings.rs @@ -21,6 +21,7 @@ pub struct Settings { pub default_schedule: Schedule, pub force_update_on_start: Option, // None = no update pub concurrent_start_updates: usize, + pub limits: LimitsSettings, pub charts_config: PathBuf, pub server: ServerSettings, @@ -48,6 +49,7 @@ impl Default for Settings { default_schedule: Schedule::from_str("0 0 1 * * * *").unwrap(), force_update_on_start: Some(false), concurrent_start_updates: 3, + limits: Default::default(), charts_config: PathBuf::from_str("config/charts.json").unwrap(), blockscout_db_url: Default::default(), create_database: Default::default(), @@ -59,6 +61,27 @@ impl Default for Settings { } } +/// Various limits like rate limiting and restrictions on input. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default, deny_unknown_fields)] +pub struct LimitsSettings { + /// Limit date interval on corresponding requests (in days). + /// (i.e. `?from=2024-03-17&to=2024-04-17`). + /// + /// If start or end of the range is left empty, min/max values + /// from DB are considered. + pub request_interval_limit_days: u32, +} + +impl Default for LimitsSettings { + fn default() -> Self { + Self { + // ~500 years seems reasonable + request_interval_limit_days: 182500, + } + } +} + impl ConfigSettings for Settings { const SERVICE_NAME: &'static str = "STATS"; } diff --git a/stats/stats-server/src/update_service.rs b/stats/stats-server/src/update_service.rs index e0d956b5c..6badf807e 100644 --- a/stats/stats-server/src/update_service.rs +++ b/stats/stats-server/src/update_service.rs @@ -1,4 +1,4 @@ -use crate::charts::{ArcChart, Charts}; +use crate::charts::{ArcChartUpdater, Charts}; use chrono::Utc; use cron::Schedule; use sea_orm::{DatabaseConnection, DbErr}; @@ -58,10 +58,10 @@ impl UpdateService { }) .collect::>(); futures::future::join_all(tasks).await; - tracing::info!("initial updating is done"); + tracing::info!("initial update is done"); } - fn spawn_chart_updater(self: &Arc, chart: ArcChart, default_schedule: &Schedule) { + fn spawn_chart_updater(self: &Arc, chart: ArcChartUpdater, default_schedule: &Schedule) { let chart_info = self .charts .charts_info @@ -78,14 +78,14 @@ impl UpdateService { tokio::spawn(async move { this.run_cron(chart, schedule).await }); } - async fn update(self: Arc, chart: ArcChart, force_full: bool) { + async fn update(self: Arc, chart: ArcChartUpdater, force_full: bool) { tracing::info!(chart = chart.name(), "updating chart"); let result = { let _timer = stats::metrics::CHART_UPDATE_TIME .with_label_values(&[chart.name()]) .start_timer(); chart - .update_with_mutex(&self.db, &self.blockscout, force_full) + .update_with_mutex(&self.db, &self.blockscout, chrono::Utc::now(), force_full) .await }; if let Err(err) = result { @@ -98,7 +98,7 @@ impl UpdateService { } } - async fn run_cron(self: Arc, chart: ArcChart, schedule: Schedule) { + async fn run_cron(self: Arc, chart: ArcChartUpdater, schedule: Schedule) { loop { let sleep_duration = time_till_next_call(&schedule); tracing::info!( diff --git a/stats/stats-server/tests/counters.rs b/stats/stats-server/tests/counters.rs index e45dfd092..f656bcedc 100644 --- a/stats/stats-server/tests/counters.rs +++ b/stats/stats-server/tests/counters.rs @@ -2,6 +2,7 @@ use blockscout_service_launcher::{ launcher::ConfigSettings, test_server::{get_test_server_settings, init_server, send_get_request}, }; +use chrono::NaiveDate; use stats::tests::{init_db::init_db_all, mock_blockscout::fill_mock_blockscout_data}; use stats_proto::blockscout::stats::v1::Counters; use stats_server::{stats, Settings}; @@ -11,7 +12,7 @@ use std::{collections::HashSet, path::PathBuf, str::FromStr}; #[ignore = "needs database"] async fn test_counters_ok() { let (stats_db, blockscout_db) = init_db_all("test_counters_ok").await; - fill_mock_blockscout_data(&blockscout_db, "2023-03-01").await; + fill_mock_blockscout_data(&blockscout_db, NaiveDate::from_str("2023-03-01").unwrap()).await; std::env::set_var("STATS__CONFIG", "./tests/config/test.toml"); let mut settings = Settings::build().expect("Failed to build settings"); diff --git a/stats/stats-server/tests/lines.rs b/stats/stats-server/tests/lines.rs index f63aea595..ab1be98e8 100644 --- a/stats/stats-server/tests/lines.rs +++ b/stats/stats-server/tests/lines.rs @@ -2,6 +2,7 @@ use blockscout_service_launcher::{ launcher::ConfigSettings, test_server::{get_test_server_settings, init_server, send_get_request}, }; +use chrono::NaiveDate; use stats::tests::{init_db::init_db_all, mock_blockscout::fill_mock_blockscout_data}; use stats_server::{stats, Settings}; use std::{path::PathBuf, str::FromStr}; @@ -10,7 +11,7 @@ use std::{path::PathBuf, str::FromStr}; #[ignore = "needs database"] async fn test_lines_ok() { let (stats_db, blockscout_db) = init_db_all("test_lines_ok").await; - fill_mock_blockscout_data(&blockscout_db, "2023-03-01").await; + fill_mock_blockscout_data(&blockscout_db, NaiveDate::from_str("2023-03-01").unwrap()).await; std::env::set_var("STATS__CONFIG", "./tests/config/test.toml"); let mut settings = Settings::build().expect("Failed to build settings"); diff --git a/stats/stats/entity/src/chart_data.rs b/stats/stats/entity/src/chart_data.rs index 57a5b3506..43f244d3a 100644 --- a/stats/stats/entity/src/chart_data.rs +++ b/stats/stats/entity/src/chart_data.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.4 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.15 use sea_orm::entity::prelude::*; @@ -10,7 +10,7 @@ pub struct Model { pub chart_id: i32, pub date: Date, pub value: String, - pub created_at: DateTime, + pub created_at: DateTimeWithTimeZone, pub min_blockscout_block: Option, } diff --git a/stats/stats/entity/src/charts.rs b/stats/stats/entity/src/charts.rs index 78b9c8ed8..24502e9fb 100644 --- a/stats/stats/entity/src/charts.rs +++ b/stats/stats/entity/src/charts.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.4 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.15 use super::sea_orm_active_enums::ChartType; use sea_orm::entity::prelude::*; @@ -11,7 +11,8 @@ pub struct Model { #[sea_orm(unique)] pub name: String, pub chart_type: ChartType, - pub created_at: DateTime, + pub created_at: DateTimeWithTimeZone, + pub last_updated_at: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/stats/stats/entity/src/lib.rs b/stats/stats/entity/src/lib.rs index 76b349c2f..0bc8535a5 100644 --- a/stats/stats/entity/src/lib.rs +++ b/stats/stats/entity/src/lib.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.4 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.15 pub mod prelude; diff --git a/stats/stats/entity/src/prelude.rs b/stats/stats/entity/src/prelude.rs index 031c04ec6..fadcad44f 100644 --- a/stats/stats/entity/src/prelude.rs +++ b/stats/stats/entity/src/prelude.rs @@ -1,3 +1,3 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.4 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.15 pub use super::{chart_data::Entity as ChartData, charts::Entity as Charts}; diff --git a/stats/stats/entity/src/sea_orm_active_enums.rs b/stats/stats/entity/src/sea_orm_active_enums.rs index ff81973b2..15a58245e 100644 --- a/stats/stats/entity/src/sea_orm_active_enums.rs +++ b/stats/stats/entity/src/sea_orm_active_enums.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.4 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.15 use sea_orm::entity::prelude::*; diff --git a/stats/stats/migration/src/lib.rs b/stats/stats/migration/src/lib.rs index bbfd6cecd..1e16257ac 100644 --- a/stats/stats/migration/src/lib.rs +++ b/stats/stats/migration/src/lib.rs @@ -3,6 +3,7 @@ use sea_orm_migration::sea_orm::{ConnectionTrait, Statement, TransactionTrait}; mod m20220101_000001_init; mod m20230814_105206_drop_zero_timestamp; +mod m20240416_090545_add_updated_at_column; pub struct Migrator; @@ -12,6 +13,7 @@ impl MigratorTrait for Migrator { vec![ Box::new(m20220101_000001_init::Migration), Box::new(m20230814_105206_drop_zero_timestamp::Migration), + Box::new(m20240416_090545_add_updated_at_column::Migration), ] } } diff --git a/stats/stats/migration/src/m20240416_090545_add_updated_at_column.rs b/stats/stats/migration/src/m20240416_090545_add_updated_at_column.rs new file mode 100644 index 000000000..78c2ecbb4 --- /dev/null +++ b/stats/stats/migration/src/m20240416_090545_add_updated_at_column.rs @@ -0,0 +1,46 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // add & populate column + // fix timezone of timestamps (everywhere in the code UTC is used) + let sql = r#" + ALTER TABLE charts + ADD COLUMN last_updated_at timestamptz; + + UPDATE charts SET last_updated_at = ( + SELECT max(date) FROM chart_data + WHERE charts.id = chart_id + GROUP BY chart_id); + + ALTER TABLE charts + ALTER COLUMN created_at TYPE timestamptz USING created_at + , ALTER COLUMN created_at SET DEFAULT (now() at time zone 'utc'); + ALTER TABLE chart_data + ALTER COLUMN created_at TYPE timestamptz USING created_at + , ALTER COLUMN created_at SET DEFAULT (now() at time zone 'utc'); + "#; + crate::from_sql(manager, sql).await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // appropriate timezone (local) should be considered automatically; + // local is assumed because there was no mention of timezone in init migration + let sql = r#" + ALTER TABLE charts + DROP COLUMN last_updated_at; + + ALTER TABLE charts + ALTER COLUMN created_at TYPE timestamp USING created_at + , ALTER COLUMN created_at SET DEFAULT (now()); + ALTER TABLE chart_data + ALTER COLUMN created_at TYPE timestamp USING created_at + , ALTER COLUMN created_at SET DEFAULT (now()); + "#; + crate::from_sql(manager, sql).await + } +} diff --git a/stats/stats/src/charts/chart.rs b/stats/stats/src/charts/chart.rs index ebd316d1c..2e826b9c6 100644 --- a/stats/stats/src/charts/chart.rs +++ b/stats/stats/src/charts/chart.rs @@ -1,6 +1,6 @@ -use super::mutex::get_global_update_mutex; use crate::ReadError; use async_trait::async_trait; +use chrono::Duration; use entity::{charts, sea_orm_active_enums::ChartType}; use sea_orm::{prelude::*, sea_query, FromQueryResult, QuerySelect, Set}; use thiserror::Error; @@ -13,6 +13,8 @@ pub enum UpdateError { StatsDB(DbErr), #[error("chart {0} not found")] NotFound(String), + #[error("date interval limit ({limit}) is exceeded; choose smaller time interval.")] + IntervalLimitExceeded { limit: Duration }, #[error("internal error: {0}")] Internal(String), } @@ -22,6 +24,7 @@ impl From for UpdateError { match read { ReadError::DB(db) => UpdateError::StatsDB(db), ReadError::NotFound(err) => UpdateError::NotFound(err), + ReadError::IntervalLimitExceeded(limit) => UpdateError::IntervalLimitExceeded { limit }, } } } @@ -42,43 +45,35 @@ pub trait Chart: Sync { fn relevant_or_zero(&self) -> bool { false } - fn drop_last_point(&self) -> bool { - self.chart_type() == ChartType::Line + /// Number of last values that are considered approximate. + /// (ordered by time) + /// + /// E.g. how many end values should be recalculated on (kinda) + /// lazy update (where `get_last_row` is retrieved successfully). + /// Also controls marking points as approximate when returning data. + /// + /// ## Value + /// + /// Usually set to 1 for line charts. Currently they have resolution of + /// 1 day. Also, data for portion of the (last) day has to be recalculated + /// on the next day. + /// + /// I.e. for number of blocks per day, stats for current day (0) are + /// not complete because blocks will be produced till the end of the day. + /// |===|= | + /// day -1 0 + fn approximate_trailing_points(&self) -> u64 { + if self.chart_type() == ChartType::Counter { + // there's only one value in counter + 0 + } else { + 1 + } } async fn create(&self, db: &DatabaseConnection) -> Result<(), DbErr> { create_chart(db, self.name().into(), self.chart_type()).await } - - async fn update( - &self, - db: &DatabaseConnection, - blockscout: &DatabaseConnection, - force_full: bool, - ) -> Result<(), UpdateError>; - - async fn update_with_mutex( - &self, - db: &DatabaseConnection, - blockscout: &DatabaseConnection, - force_full: bool, - ) -> Result<(), UpdateError> { - let name = self.name(); - let mutex = get_global_update_mutex(name).await; - let _permit = { - match mutex.try_lock() { - Ok(v) => v, - Err(_) => { - tracing::warn!( - chart_name = name, - "found locked update mutex, waiting for unlock" - ); - mutex.lock().await - } - } - }; - self.update(db, blockscout, force_full).await - } } #[derive(Debug, FromQueryResult)] diff --git a/stats/stats/src/charts/counters/average_block_time.rs b/stats/stats/src/charts/counters/average_block_time.rs index 2e9513eb5..3df88fc8a 100644 --- a/stats/stats/src/charts/counters/average_block_time.rs +++ b/stats/stats/src/charts/counters/average_block_time.rs @@ -1,7 +1,7 @@ use crate::{ - charts::{ - insert::{DateValue, DateValueDouble}, - updater::ChartFullUpdater, + charts::db_interaction::{ + chart_updaters::{ChartFullUpdater, ChartUpdater}, + types::{DateValue, DateValueDouble}, }, UpdateError, }; @@ -56,14 +56,19 @@ impl crate::Chart for AverageBlockTime { fn chart_type(&self) -> ChartType { ChartType::Counter } +} - async fn update( +#[async_trait] +impl ChartUpdater for AverageBlockTime { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/counters/completed_txns.rs b/stats/stats/src/charts/counters/completed_txns.rs index 49323af18..f44f5d38b 100644 --- a/stats/stats/src/charts/counters/completed_txns.rs +++ b/stats/stats/src/charts/counters/completed_txns.rs @@ -1,5 +1,8 @@ use crate::{ - charts::{insert::DateValue, updater::ChartFullUpdater}, + charts::db_interaction::{ + chart_updaters::{ChartFullUpdater, ChartUpdater}, + types::DateValue, + }, UpdateError, }; use async_trait::async_trait; @@ -62,14 +65,19 @@ impl crate::Chart for CompletedTxns { fn chart_type(&self) -> ChartType { ChartType::Counter } +} - async fn update( +#[async_trait] +impl ChartUpdater for CompletedTxns { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/counters/last_new_contracts.rs b/stats/stats/src/charts/counters/last_new_contracts.rs index 4d010fa9b..734db13bb 100644 --- a/stats/stats/src/charts/counters/last_new_contracts.rs +++ b/stats/stats/src/charts/counters/last_new_contracts.rs @@ -1,8 +1,10 @@ use crate::{ charts::{ create_chart, - insert::DateValue, - updater::{last_point, ChartDependentUpdater}, + db_interaction::{ + chart_updaters::{last_point, ChartDependentUpdater, ChartUpdater}, + types::DateValue, + }, }, lines::NewContracts, UpdateError, @@ -51,14 +53,19 @@ impl crate::Chart for LastNewContracts { self.parent.create(db).await?; create_chart(db, self.name().into(), self.chart_type()).await } +} - async fn update( +#[async_trait] +impl ChartUpdater for LastNewContracts { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/counters/last_new_verified_contracts.rs b/stats/stats/src/charts/counters/last_new_verified_contracts.rs index 767cb260c..da7e31f60 100644 --- a/stats/stats/src/charts/counters/last_new_verified_contracts.rs +++ b/stats/stats/src/charts/counters/last_new_verified_contracts.rs @@ -1,8 +1,10 @@ use crate::{ charts::{ create_chart, - insert::DateValue, - updater::{last_point, ChartDependentUpdater}, + db_interaction::{ + chart_updaters::{last_point, ChartDependentUpdater, ChartUpdater}, + types::DateValue, + }, }, lines::NewVerifiedContracts, UpdateError, @@ -51,14 +53,19 @@ impl crate::Chart for LastNewVerifiedContracts { self.parent.create(db).await?; create_chart(db, self.name().into(), self.chart_type()).await } +} - async fn update( +#[async_trait] +impl ChartUpdater for LastNewVerifiedContracts { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/counters/mock.rs b/stats/stats/src/charts/counters/mock.rs index 65460f1b0..814574075 100644 --- a/stats/stats/src/charts/counters/mock.rs +++ b/stats/stats/src/charts/counters/mock.rs @@ -1,8 +1,12 @@ use crate::{ - charts::{insert::DateValue, updater::ChartFullUpdater}, + charts::db_interaction::{ + chart_updaters::{ChartFullUpdater, ChartUpdater}, + types::DateValue, + }, UpdateError, }; use async_trait::async_trait; +use chrono::NaiveDate; use entity::sea_orm_active_enums::ChartType; use sea_orm::prelude::*; @@ -25,7 +29,7 @@ impl ChartFullUpdater for MockCounter { _blockscout: &DatabaseConnection, ) -> Result, UpdateError> { let item = DateValue { - date: chrono::offset::Local::now().date_naive(), + date: NaiveDate::parse_from_str("2022-11-12", "%Y-%m-%d").unwrap(), value: self.value.clone(), }; Ok(vec![item]) @@ -41,13 +45,18 @@ impl crate::Chart for MockCounter { fn chart_type(&self) -> ChartType { ChartType::Counter } +} - async fn update( +#[async_trait] +impl ChartUpdater for MockCounter { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/counters/total_accounts.rs b/stats/stats/src/charts/counters/total_accounts.rs index a66287c0f..0a02d5873 100644 --- a/stats/stats/src/charts/counters/total_accounts.rs +++ b/stats/stats/src/charts/counters/total_accounts.rs @@ -1,8 +1,10 @@ use crate::{ charts::{ cache::Cache, - insert::{DateValue, DateValueInt}, - updater::ChartFullUpdater, + db_interaction::{ + chart_updaters::{ChartFullUpdater, ChartUpdater}, + types::{DateValue, DateValueInt}, + }, }, lines::{AccountsGrowth, NewAccounts}, UpdateError, @@ -52,14 +54,19 @@ impl crate::Chart for TotalAccounts { fn chart_type(&self) -> ChartType { ChartType::Counter } +} - async fn update( +#[async_trait] +impl ChartUpdater for TotalAccounts { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/counters/total_addresses.rs b/stats/stats/src/charts/counters/total_addresses.rs index 4228e7b91..98cf534b3 100644 --- a/stats/stats/src/charts/counters/total_addresses.rs +++ b/stats/stats/src/charts/counters/total_addresses.rs @@ -1,5 +1,8 @@ use crate::{ - charts::{insert::DateValue, updater::ChartFullUpdater}, + charts::db_interaction::{ + chart_updaters::{ChartFullUpdater, ChartUpdater}, + types::DateValue, + }, UpdateError, }; use async_trait::async_trait; @@ -46,14 +49,19 @@ impl crate::Chart for TotalAddresses { fn chart_type(&self) -> ChartType { ChartType::Counter } +} - async fn update( +#[async_trait] +impl ChartUpdater for TotalAddresses { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/counters/total_blocks.rs b/stats/stats/src/charts/counters/total_blocks.rs index 07449584e..b8981c5fa 100644 --- a/stats/stats/src/charts/counters/total_blocks.rs +++ b/stats/stats/src/charts/counters/total_blocks.rs @@ -1,5 +1,8 @@ use crate::{ - charts::{insert::DateValue, updater::ChartFullUpdater}, + charts::db_interaction::{ + chart_updaters::{ChartFullUpdater, ChartUpdater}, + types::DateValue, + }, UpdateError, }; use async_trait::async_trait; @@ -51,14 +54,19 @@ impl crate::Chart for TotalBlocks { fn chart_type(&self) -> ChartType { ChartType::Counter } +} - async fn update( +#[async_trait] +impl ChartUpdater for TotalBlocks { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } @@ -82,6 +90,8 @@ mod tests { let _ = tracing_subscriber::fmt::try_init(); let (db, blockscout) = init_db_all("update_total_blocks_recurrent").await; let updater = TotalBlocks::default(); + let current_time = chrono::DateTime::from_str("2023-03-01T12:00:00Z").unwrap(); + let current_date = current_time.date_naive(); updater.create(&db).await.unwrap(); @@ -95,9 +105,12 @@ mod tests { .await .unwrap(); - fill_mock_blockscout_data(&blockscout, "2023-03-01").await; + fill_mock_blockscout_data(&blockscout, current_date).await; - updater.update(&db, &blockscout, true).await.unwrap(); + updater + .update(&db, &blockscout, current_time, true) + .await + .unwrap(); let data = get_counters(&db).await.unwrap(); assert_eq!("13", data[updater.name()].value); } @@ -108,12 +121,17 @@ mod tests { let _ = tracing_subscriber::fmt::try_init(); let (db, blockscout) = init_db_all("update_total_blocks_fresh").await; let updater = TotalBlocks::default(); + let current_time = chrono::DateTime::from_str("2022-11-12T12:00:00Z").unwrap(); + let current_date = current_time.date_naive(); updater.create(&db).await.unwrap(); - fill_mock_blockscout_data(&blockscout, "2022-11-12").await; + fill_mock_blockscout_data(&blockscout, current_date).await; - updater.update(&db, &blockscout, true).await.unwrap(); + updater + .update(&db, &blockscout, current_time, true) + .await + .unwrap(); let data = get_counters(&db).await.unwrap(); assert_eq!("9", data[updater.name()].value); } @@ -124,6 +142,8 @@ mod tests { let _ = tracing_subscriber::fmt::try_init(); let (db, blockscout) = init_db_all("update_total_blocks_last").await; let updater = TotalBlocks::default(); + let current_time = chrono::DateTime::from_str("2023-03-01T12:00:00Z").unwrap(); + let current_date = current_time.date_naive(); updater.create(&db).await.unwrap(); @@ -137,9 +157,12 @@ mod tests { .await .unwrap(); - fill_mock_blockscout_data(&blockscout, "2023-03-01").await; + fill_mock_blockscout_data(&blockscout, current_date).await; - updater.update(&db, &blockscout, true).await.unwrap(); + updater + .update(&db, &blockscout, current_time, true) + .await + .unwrap(); let data = get_counters(&db).await.unwrap(); assert_eq!("13", data[updater.name()].value); } diff --git a/stats/stats/src/charts/counters/total_contracts.rs b/stats/stats/src/charts/counters/total_contracts.rs index 6db6ae4a3..211ed93ef 100644 --- a/stats/stats/src/charts/counters/total_contracts.rs +++ b/stats/stats/src/charts/counters/total_contracts.rs @@ -1,8 +1,10 @@ use crate::{ charts::{ create_chart, - insert::DateValue, - updater::{last_point, ChartDependentUpdater}, + db_interaction::{ + chart_updaters::{last_point, ChartDependentUpdater, ChartUpdater}, + types::DateValue, + }, }, lines::ContractsGrowth, UpdateError, @@ -49,14 +51,19 @@ impl crate::Chart for TotalContracts { self.parent.create(db).await?; create_chart(db, self.name().into(), self.chart_type()).await } +} - async fn update( +#[async_trait] +impl ChartUpdater for TotalContracts { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/counters/total_native_coin_holders.rs b/stats/stats/src/charts/counters/total_native_coin_holders.rs index 5fe3dd798..216015551 100644 --- a/stats/stats/src/charts/counters/total_native_coin_holders.rs +++ b/stats/stats/src/charts/counters/total_native_coin_holders.rs @@ -1,8 +1,10 @@ use crate::{ charts::{ create_chart, - insert::DateValue, - updater::{last_point, ChartDependentUpdater}, + db_interaction::{ + chart_updaters::{last_point, ChartDependentUpdater, ChartUpdater}, + types::DateValue, + }, }, lines::NativeCoinHoldersGrowth, UpdateError, @@ -49,14 +51,19 @@ impl crate::Chart for TotalNativeCoinHolders { self.parent.create(db).await?; create_chart(db, self.name().into(), self.chart_type()).await } +} - async fn update( +#[async_trait] +impl ChartUpdater for TotalNativeCoinHolders { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/counters/total_native_coin_transfers.rs b/stats/stats/src/charts/counters/total_native_coin_transfers.rs index 601e85022..c492122e1 100644 --- a/stats/stats/src/charts/counters/total_native_coin_transfers.rs +++ b/stats/stats/src/charts/counters/total_native_coin_transfers.rs @@ -1,8 +1,10 @@ use crate::{ charts::{ create_chart, - insert::DateValue, - updater::{parse_and_sum, ChartDependentUpdater}, + db_interaction::{ + chart_updaters::{parse_and_sum, ChartDependentUpdater, ChartUpdater}, + types::DateValue, + }, }, lines::NewNativeCoinTransfers, Chart, UpdateError, @@ -49,14 +51,19 @@ impl crate::Chart for TotalNativeCoinTransfers { self.parent.create(db).await?; create_chart(db, self.name().into(), self.chart_type()).await } +} - async fn update( +#[async_trait] +impl ChartUpdater for TotalNativeCoinTransfers { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/counters/total_tokens.rs b/stats/stats/src/charts/counters/total_tokens.rs index 6362f649a..32bde30a1 100644 --- a/stats/stats/src/charts/counters/total_tokens.rs +++ b/stats/stats/src/charts/counters/total_tokens.rs @@ -1,5 +1,8 @@ use crate::{ - charts::{insert::DateValue, updater::ChartFullUpdater}, + charts::db_interaction::{ + chart_updaters::{ChartFullUpdater, ChartUpdater}, + types::DateValue, + }, UpdateError, }; use async_trait::async_trait; @@ -49,14 +52,19 @@ impl crate::Chart for TotalTokens { fn chart_type(&self) -> ChartType { ChartType::Counter } +} - async fn update( +#[async_trait] +impl ChartUpdater for TotalTokens { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/counters/total_txns.rs b/stats/stats/src/charts/counters/total_txns.rs index 21b0400da..71cd86261 100644 --- a/stats/stats/src/charts/counters/total_txns.rs +++ b/stats/stats/src/charts/counters/total_txns.rs @@ -1,8 +1,10 @@ use crate::{ charts::{ create_chart, - insert::DateValue, - updater::{parse_and_sum, ChartDependentUpdater}, + db_interaction::{ + chart_updaters::{parse_and_sum, ChartDependentUpdater, ChartUpdater}, + types::DateValue, + }, }, lines::NewTxns, Chart, UpdateError, @@ -49,14 +51,19 @@ impl crate::Chart for TotalTxns { self.parent.create(db).await?; create_chart(db, self.name().into(), self.chart_type()).await } +} - async fn update( +#[async_trait] +impl ChartUpdater for TotalTxns { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/counters/total_verified_contracts.rs b/stats/stats/src/charts/counters/total_verified_contracts.rs index 48e136073..108b2f72b 100644 --- a/stats/stats/src/charts/counters/total_verified_contracts.rs +++ b/stats/stats/src/charts/counters/total_verified_contracts.rs @@ -1,8 +1,10 @@ use crate::{ charts::{ create_chart, - insert::DateValue, - updater::{last_point, ChartDependentUpdater}, + db_interaction::{ + chart_updaters::{last_point, ChartDependentUpdater, ChartUpdater}, + types::DateValue, + }, }, lines::VerifiedContractsGrowth, UpdateError, @@ -49,14 +51,19 @@ impl crate::Chart for TotalVerifiedContracts { self.parent.create(db).await?; create_chart(db, self.name().into(), self.chart_type()).await } +} - async fn update( +#[async_trait] +impl ChartUpdater for TotalVerifiedContracts { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/updater/batch.rs b/stats/stats/src/charts/db_interaction/chart_updaters/batch.rs similarity index 70% rename from stats/stats/src/charts/updater/batch.rs rename to stats/stats/src/charts/db_interaction/chart_updaters/batch.rs index f8e5adbef..6e5a6a9f2 100644 --- a/stats/stats/src/charts/updater/batch.rs +++ b/stats/stats/src/charts/db_interaction/chart_updaters/batch.rs @@ -1,15 +1,28 @@ -use super::{get_last_row, get_min_block_blockscout, get_min_date_blockscout}; +//! Similar to `ChartPartialUpdater`, but performs multiple updates for each time period +//! ([`step_duration`]). +//! +//! Useful when each period is independent and this division can help with performance. +//! I.e. if updating a large interval at once (e.g. like `ChartPartialUpdater` does in +//! `force_full` or initial updates) is too expensive. + +use super::{ + common_operations::{get_min_block_blockscout, get_min_date_blockscout, get_nth_last_row}, + ChartUpdater, +}; use crate::{ - charts::{find_chart, insert::insert_data_many}, - metrics, Chart, DateValue, UpdateError, + charts::{ + db_interaction::{types::DateValue, write::insert_data_many}, + find_chart, + }, + metrics, UpdateError, }; use async_trait::async_trait; -use chrono::{Duration, NaiveDate, Utc}; +use chrono::{DateTime, Duration, NaiveDate, Utc}; use sea_orm::{DatabaseConnection, FromQueryResult, Statement, TransactionTrait}; use std::time::Instant; #[async_trait] -pub trait ChartBatchUpdater: Chart { +pub trait ChartBatchUpdater: ChartUpdater { fn get_query(&self, from: NaiveDate, to: NaiveDate) -> Statement; fn step_duration(&self) -> chrono::Duration { chrono::Duration::days(30) @@ -19,6 +32,7 @@ pub trait ChartBatchUpdater: Chart { &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: DateTime, force_full: bool, ) -> Result<(), UpdateError> { let chart_id = find_chart(db, self.name()) @@ -28,24 +42,31 @@ pub trait ChartBatchUpdater: Chart { let min_blockscout_block = get_min_block_blockscout(blockscout) .await .map_err(UpdateError::BlockscoutDB)?; - // set offset to 1 because actual last row can be partially calculated - let offset = Some(1); - let last_row = - get_last_row(self, chart_id, min_blockscout_block, db, force_full, offset).await?; + let offset = Some(self.approximate_trailing_points()); + let last_updated_row = + get_nth_last_row(self, chart_id, min_blockscout_block, db, force_full, offset).await?; let _timer = metrics::CHART_FETCH_NEW_DATA_TIME .with_label_values(&[self.name()]) .start_timer(); - tracing::info!(last_row =? last_row, "start batch update"); - self.batch_update(db, blockscout, last_row, chart_id, min_blockscout_block) - .await + tracing::info!(last_updated_row =? last_updated_row, "start batch update"); + self.batch_update( + db, + blockscout, + last_updated_row, + current_time.date_naive(), + chart_id, + min_blockscout_block, + ) + .await } async fn batch_update( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, - last_row: Option, + update_from_row: Option, + today: NaiveDate, chart_id: i32, min_blockscout_block: i64, ) -> Result<(), UpdateError> { @@ -53,16 +74,15 @@ pub trait ChartBatchUpdater: Chart { .begin() .await .map_err(UpdateError::BlockscoutDB)?; - let first_date = match last_row { - Some(last_row) => last_row.date, + let first_date = match update_from_row { + Some(row) => row.date, None => get_min_date_blockscout(&txn) .await .map(|time| time.date()) .map_err(UpdateError::BlockscoutDB)?, }; - let last_date = Utc::now().date_naive(); - let steps = generate_date_ranges(first_date, last_date, self.step_duration()); + let steps = generate_date_ranges(first_date, today, self.step_duration()); let n = steps.len(); for (i, (from, to)) in steps.into_iter().enumerate() { @@ -136,6 +156,11 @@ mod tests { (d("2015-12-17"), d("2016-01-16")), ], ), + ((d("2015-07-20"), d("2015-07-20")), vec![]), + ( + (d("2015-07-20"), d("2015-07-21")), + vec![(d("2015-07-20"), d("2015-08-19"))], + ), ] { let actual = generate_date_ranges(from, to, Duration::days(30)); assert_eq!(expected, actual); diff --git a/stats/stats/src/charts/updater/mod.rs b/stats/stats/src/charts/db_interaction/chart_updaters/common_operations.rs similarity index 75% rename from stats/stats/src/charts/updater/mod.rs rename to stats/stats/src/charts/db_interaction/chart_updaters/common_operations.rs index d4d3d482b..d1ca0417d 100644 --- a/stats/stats/src/charts/updater/mod.rs +++ b/stats/stats/src/charts/db_interaction/chart_updaters/common_operations.rs @@ -1,16 +1,13 @@ -use blockscout_db::entity::blocks; -use chrono::{NaiveDate, NaiveDateTime}; -use entity::chart_data; -use sea_orm::{prelude::*, sea_query, ConnectionTrait, FromQueryResult, QueryOrder, QuerySelect}; -mod batch; -mod dependent; -mod full; -mod partial; +//! Collection of common operations to perform while updating. +//! Can be useful for any chart regardless of their type. -pub use batch::ChartBatchUpdater; -pub use dependent::{last_point, parse_and_growth, parse_and_sum, ChartDependentUpdater}; -pub use full::ChartFullUpdater; -pub use partial::ChartPartialUpdater; +use blockscout_db::entity::blocks; +use chrono::{NaiveDate, NaiveDateTime, Offset}; +use entity::{chart_data, charts}; +use sea_orm::{ + prelude::*, sea_query, ConnectionTrait, DatabaseConnection, DbErr, EntityTrait, + FromQueryResult, QueryFilter, QueryOrder, QuerySelect, Set, Unchanged, +}; use crate::{Chart, DateValue, UpdateError}; @@ -70,7 +67,12 @@ struct SyncInfo { pub min_blockscout_block: Option, } -pub async fn get_last_row( +/// Get `offset`th last row. Date of the row can be a starting point for an update. +/// Usually used to retrieve last 'finalized' row (for which no recomputations needed). +/// +/// Retrieves `offset`th latest data point from DB, if any. +/// In case of inconsistencies or set `force_full`, also returns `None`. +pub async fn get_nth_last_row( chart: &C, chart_id: i32, min_blockscout_block: i64, @@ -82,7 +84,7 @@ where C: Chart + ?Sized, { let offset = offset.unwrap_or(0); - let last_row = if force_full { + let row = if force_full { tracing::info!( min_blockscout_block = min_blockscout_block, chart = chart.name(), @@ -90,7 +92,7 @@ where ); None } else { - let last_row: Option = chart_data::Entity::find() + let row: Option = chart_data::Entity::find() .column(chart_data::Column::Date) .column(chart_data::Column::Value) .column(chart_data::Column::MinBlockscoutBlock) @@ -102,7 +104,7 @@ where .await .map_err(UpdateError::StatsDB)?; - match last_row { + match row { Some(row) => { if let Some(block) = row.min_blockscout_block { if block == min_blockscout_block { @@ -146,5 +148,26 @@ where } }; - Ok(last_row) + Ok(row) +} + +pub async fn set_last_updated_at( + chart_id: i32, + db: &DatabaseConnection, + at: chrono::DateTime, +) -> Result<(), DbErr> +where + Tz: chrono::TimeZone, +{ + let last_updated_at = at.with_timezone(&chrono::Utc.fix()); + let model = charts::ActiveModel { + id: Unchanged(chart_id), + last_updated_at: Set(Some(last_updated_at)), + ..Default::default() + }; + charts::Entity::update(model) + .filter(charts::Column::Id.eq(chart_id)) + .exec(db) + .await?; + Ok(()) } diff --git a/stats/stats/src/charts/updater/dependent.rs b/stats/stats/src/charts/db_interaction/chart_updaters/dependent.rs similarity index 71% rename from stats/stats/src/charts/updater/dependent.rs rename to stats/stats/src/charts/db_interaction/chart_updaters/dependent.rs index d722c4390..717a169c4 100644 --- a/stats/stats/src/charts/updater/dependent.rs +++ b/stats/stats/src/charts/db_interaction/chart_updaters/dependent.rs @@ -1,28 +1,39 @@ -use super::get_min_block_blockscout; +//! Updates chart according to data from another chart. +//! I.e. current chart depends on another (on "parent") + +use super::{common_operations::get_min_block_blockscout, ChartUpdater}; use crate::{ charts::{ + db_interaction::{types::DateValue, write::insert_data_many}, find_chart, - insert::{insert_data_many, DateValue}, }, - get_chart_data, Chart, UpdateError, + get_chart_data, UpdateError, }; use async_trait::async_trait; +use chrono::Utc; use sea_orm::prelude::*; use std::{fmt::Display, iter::Sum, ops::AddAssign, str::FromStr, sync::Arc}; #[async_trait] -pub trait ChartDependentUpdater

: Chart +pub trait ChartDependentUpdater

: ChartUpdater where - P: Chart + Send, + P: ChartUpdater + Send, { fn parent(&self) -> Arc

; + // Note that usually this chart's `approximate_trailing_points` logically + // matches the one of it's parent + fn parent_approximate_trailing_points(&self) -> u64 { + self.parent().approximate_trailing_points() + } + async fn get_values(&self, parent_data: Vec) -> Result, UpdateError>; async fn get_parent_data( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result, UpdateError> { let parent = self.parent(); @@ -31,8 +42,20 @@ where parent_chart_name = parent.name(), "updating parent" ); - parent.update_with_mutex(db, blockscout, force_full).await?; - let data = get_chart_data(db, parent.name(), None, None, None).await?; + parent + .update_with_mutex(db, blockscout, current_time, force_full) + .await?; + let data = get_chart_data( + db, + parent.name(), + None, + None, + None, + None, + self.parent_approximate_trailing_points(), + ) + .await?; + let data = data.into_iter().map(DateValue::from).collect(); Ok(data) } @@ -40,6 +63,7 @@ where &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { let chart_id = find_chart(db, self.name()) @@ -49,7 +73,9 @@ where let min_blockscout_block = get_min_block_blockscout(blockscout) .await .map_err(UpdateError::BlockscoutDB)?; - let parent_data = self.get_parent_data(db, blockscout, force_full).await?; + let parent_data = self + .get_parent_data(db, blockscout, current_time, force_full) + .await?; let values = self .get_values(parent_data) .await? @@ -61,7 +87,7 @@ where } } -pub fn parse_and_growth( +pub fn parse_and_cumsum( mut data: Vec, parent_name: &str, ) -> Result, UpdateError> diff --git a/stats/stats/src/charts/updater/full.rs b/stats/stats/src/charts/db_interaction/chart_updaters/full.rs similarity index 79% rename from stats/stats/src/charts/updater/full.rs rename to stats/stats/src/charts/db_interaction/chart_updaters/full.rs index fdafa5430..e40347a10 100644 --- a/stats/stats/src/charts/updater/full.rs +++ b/stats/stats/src/charts/db_interaction/chart_updaters/full.rs @@ -1,15 +1,19 @@ +//! Re-reads/re-calculates whole chart from the source DB. + +use super::ChartUpdater; use crate::{ charts::{ + db_interaction::{types::DateValue, write::insert_data_many}, find_chart, - insert::{insert_data_many, DateValue}, }, - metrics, Chart, UpdateError, + metrics, UpdateError, }; use async_trait::async_trait; +use chrono::Utc; use sea_orm::prelude::*; #[async_trait] -pub trait ChartFullUpdater: Chart { +pub trait ChartFullUpdater: ChartUpdater { async fn get_values( &self, blockscout: &DatabaseConnection, @@ -19,6 +23,7 @@ pub trait ChartFullUpdater: Chart { &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + _current_time: chrono::DateTime, _force_full: bool, ) -> Result<(), UpdateError> { let chart_id = find_chart(db, self.name()) diff --git a/stats/stats/src/charts/db_interaction/chart_updaters/mod.rs b/stats/stats/src/charts/db_interaction/chart_updaters/mod.rs new file mode 100644 index 000000000..bd28fefb3 --- /dev/null +++ b/stats/stats/src/charts/db_interaction/chart_updaters/mod.rs @@ -0,0 +1,97 @@ +//! Update logic for charts. +//! +//! Depending on the chart nature, various tactics are better fit (in terms of efficiency, performance, etc.). + +use async_trait::async_trait; +use chrono::Utc; +use sea_orm::prelude::*; + +use crate::{ + charts::{find_chart, mutex::get_global_update_mutex}, + Chart, UpdateError, +}; + +mod batch; +pub(crate) mod common_operations; +mod dependent; +mod full; +mod partial; + +pub use batch::ChartBatchUpdater; +pub use dependent::{last_point, parse_and_cumsum, parse_and_sum, ChartDependentUpdater}; +pub use full::ChartFullUpdater; +pub use partial::ChartPartialUpdater; + +#[async_trait] +pub trait ChartUpdater: Chart { + /// Update only data (values) of the chart (`chart_data` table). + /// + /// Implementation is expected to be highly variable. + async fn update_values( + &self, + db: &DatabaseConnection, + blockscout: &DatabaseConnection, + current_time: chrono::DateTime, + force_full: bool, + ) -> Result<(), UpdateError>; + + /// Update only metadata of the chart (`charts` table). + /// + /// Generally better to call after changing chart data to keep + /// the info relevant (i.e. if it depends on values). + async fn update_metadata( + &self, + db: &DatabaseConnection, + _blockscout: &DatabaseConnection, + current_time: chrono::DateTime, + ) -> Result<(), UpdateError> { + let chart_id = find_chart(db, self.name()) + .await + .map_err(UpdateError::StatsDB)? + .ok_or_else(|| UpdateError::NotFound(self.name().into()))?; + common_operations::set_last_updated_at(chart_id, db, current_time) + .await + .map_err(UpdateError::StatsDB) + } + + /// Update data and metadata of the chart. + /// + /// `current_time` is settable mainly for testing purposes. So that + /// code dependant on time (mostly metadata updates) can be reproducibly tested. + async fn update( + &self, + db: &DatabaseConnection, + blockscout: &DatabaseConnection, + current_time: chrono::DateTime, + force_full: bool, + ) -> Result<(), UpdateError> { + self.update_values(db, blockscout, current_time, force_full) + .await?; + self.update_metadata(db, blockscout, current_time).await + } + + /// Run [`Self::update`] with acquiring global mutex for the chart + async fn update_with_mutex( + &self, + db: &DatabaseConnection, + blockscout: &DatabaseConnection, + current_time: chrono::DateTime, + force_full: bool, + ) -> Result<(), UpdateError> { + let name = self.name(); + let mutex = get_global_update_mutex(name).await; + let _permit = { + match mutex.try_lock() { + Ok(v) => v, + Err(_) => { + tracing::warn!( + chart_name = name, + "found locked update mutex, waiting for unlock" + ); + mutex.lock().await + } + } + }; + self.update(db, blockscout, current_time, force_full).await + } +} diff --git a/stats/stats/src/charts/updater/partial.rs b/stats/stats/src/charts/db_interaction/chart_updaters/partial.rs similarity index 61% rename from stats/stats/src/charts/updater/partial.rs rename to stats/stats/src/charts/db_interaction/chart_updaters/partial.rs index 61bf70f03..69ed8318f 100644 --- a/stats/stats/src/charts/updater/partial.rs +++ b/stats/stats/src/charts/db_interaction/chart_updaters/partial.rs @@ -1,26 +1,35 @@ -use super::{get_last_row, get_min_block_blockscout}; +//! Only retrieves new values and updates the latest one. +//! +//! In some cases performes full update (i.e. when some inconsistency was found or `force_full` is set) + +use super::{ + common_operations::{get_min_block_blockscout, get_nth_last_row}, + ChartUpdater, +}; use crate::{ charts::{ + db_interaction::{types::DateValue, write::insert_data_many}, find_chart, - insert::{insert_data_many, DateValue}, }, - metrics, Chart, UpdateError, + metrics, UpdateError, }; use async_trait::async_trait; +use chrono::Utc; use sea_orm::prelude::*; #[async_trait] -pub trait ChartPartialUpdater: Chart { +pub trait ChartPartialUpdater: ChartUpdater { async fn get_values( &self, blockscout: &DatabaseConnection, - last_row: Option, + last_updated_row: Option, ) -> Result, UpdateError>; async fn update_with_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + _current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { let chart_id = find_chart(db, self.name()) @@ -30,15 +39,14 @@ pub trait ChartPartialUpdater: Chart { let min_blockscout_block = get_min_block_blockscout(blockscout) .await .map_err(UpdateError::BlockscoutDB)?; - // set offset to 1 because actual last row can be partially calculated - let offset = Some(1); - let last_row = - get_last_row(self, chart_id, min_blockscout_block, db, force_full, offset).await?; + let offset = Some(self.approximate_trailing_points()); + let last_updated_row = + get_nth_last_row(self, chart_id, min_blockscout_block, db, force_full, offset).await?; let values = { let _timer = metrics::CHART_FETCH_NEW_DATA_TIME .with_label_values(&[self.name()]) .start_timer(); - self.get_values(blockscout, last_row) + self.get_values(blockscout, last_updated_row) .await? .into_iter() .map(|value| value.active_model(chart_id, Some(min_blockscout_block))) diff --git a/stats/stats/src/charts/db_interaction/mod.rs b/stats/stats/src/charts/db_interaction/mod.rs new file mode 100644 index 000000000..9802ddf60 --- /dev/null +++ b/stats/stats/src/charts/db_interaction/mod.rs @@ -0,0 +1,6 @@ +//! Abstracted interaction with DB + +pub mod chart_updaters; +pub mod read; +pub mod types; +pub mod write; diff --git a/stats/stats/src/read.rs b/stats/stats/src/charts/db_interaction/read.rs similarity index 52% rename from stats/stats/src/read.rs rename to stats/stats/src/charts/db_interaction/read.rs index 529346cc4..3dab1dbf8 100644 --- a/stats/stats/src/read.rs +++ b/stats/stats/src/charts/db_interaction/read.rs @@ -1,19 +1,24 @@ -use crate::{charts::insert::DateValue, missing_date::get_and_fill_chart, MissingDatePolicy}; -use chrono::NaiveDate; +use crate::{ + missing_date::{fill_and_filter_chart, filter_within_range}, + DateValue, ExtendedDateValue, MissingDatePolicy, +}; +use chrono::{Duration, NaiveDate}; use entity::{chart_data, charts}; use sea_orm::{ - ColumnTrait, DatabaseConnection, DbBackend, DbErr, EntityTrait, FromQueryResult, QueryFilter, - QueryOrder, QuerySelect, Statement, + sea_query::Expr, ColumnTrait, DatabaseConnection, DbBackend, DbErr, EntityTrait, + FromQueryResult, QueryFilter, QueryOrder, QuerySelect, Statement, }; use std::collections::HashMap; use thiserror::Error; -#[derive(Error, Debug)] +#[derive(Error, Debug, PartialEq, Eq)] pub enum ReadError { #[error("database error {0}")] DB(#[from] DbErr), #[error("chart {0} not found")] NotFound(String), + #[error("date interval limit ({0}) is exceeded; choose smaller time interval.")] + IntervalLimitExceeded(Duration), } #[derive(Debug, FromQueryResult)] @@ -57,13 +62,51 @@ pub async fn get_counters( Ok(counters) } +/// Mark corresponding data points as approximate. +/// +/// Approximate are: +/// - points after `last_updated_at` date +/// - `approximate_until_updated` dates starting from `last_updated_at` and moving back +/// +/// If `approximate_until_updated=0` - only future points are marked +fn mark_approximate( + data: Vec, + last_updated_at: NaiveDate, + approximate_until_updated: u64, +) -> Vec { + // saturating sub/add + let next_after_updated_at = last_updated_at + .checked_add_days(chrono::Days::new(1)) + .unwrap_or(NaiveDate::MAX); + let mark_from_date = next_after_updated_at + .checked_sub_days(chrono::Days::new(approximate_until_updated)) + .unwrap_or(NaiveDate::MIN); + data.into_iter() + .map(|dv| { + let is_marked = dv.date >= mark_from_date; + ExtendedDateValue::from_date_value(dv, is_marked) + }) + .collect() +} + +/// Get data points for the chart `name`. +/// +/// `approximate_trailing_points` - number of trailing points to mark as approximate. +/// +/// `interval_limit` - max interval (from, to). If `from` or `to` are none, +/// min or max date in DB are calculated. +/// +/// Note: if future dates are specified in interval `(from, to)`, no data points +/// for future dates are returned. pub async fn get_chart_data( db: &DatabaseConnection, name: &str, from: Option, to: Option, + interval_limit: Option, policy: Option, -) -> Result, ReadError> { + approximate_trailing_points: u64, +) -> Result, ReadError> { let chart = charts::Entity::find() .column(charts::Column::Id) .filter(charts::Column::Name.eq(name)) @@ -71,10 +114,34 @@ pub async fn get_chart_data( .await? .ok_or_else(|| ReadError::NotFound(name.into()))?; - let data = match policy { - Some(policy) => get_and_fill_chart(db, chart.id, from, to, policy).await?, - None => get_chart(db, chart.id, from, to).await?, + let db_data = get_chart(db, chart.id, from, to).await?; + + let last_updated_at = chart.last_updated_at.map(|t| t.date_naive()); + if last_updated_at.is_none() && !db_data.is_empty() { + tracing::warn!( + chart_name = chart.name, + db_data_len = db_data.len(), + "`last_updated_at` is not set whereas data is present in DB" + ); + } + + // may include future points that were not yet collected and were just filled accordingly. + let data_with_maybe_future = match policy { + Some(policy) => fill_and_filter_chart(db_data, from, to, policy, interval_limit)?, + None => db_data, }; + let data_with_maybe_future_len = data_with_maybe_future.len(); + let data_unmarked = filter_within_range(data_with_maybe_future, None, last_updated_at); + if let Some(filtered) = data_with_maybe_future_len.checked_sub(data_unmarked.len()) { + if filtered > 0 { + tracing::debug!(last_updated_at = ?last_updated_at, "{} future points were removed", filtered); + } + } + let data = mark_approximate( + data_unmarked, + last_updated_at.unwrap_or(NaiveDate::MAX), + approximate_trailing_points, + ); Ok(data) } @@ -84,29 +151,36 @@ async fn get_chart( from: Option, to: Option, ) -> Result, DbErr> { - let data_request = chart_data::Entity::find() + let mut data_request = chart_data::Entity::find() .column(chart_data::Column::Date) .column(chart_data::Column::Value) .filter(chart_data::Column::ChartId.eq(chart_id)) .order_by_asc(chart_data::Column::Date); - let data_request = if let Some(from) = from { - data_request.filter(chart_data::Column::Date.gte(from)) - } else { - data_request - }; - let data_request = if let Some(to) = to { - data_request.filter(chart_data::Column::Date.lte(to)) - } else { - data_request + if let Some(from) = from { + let custom_where = Expr::cust_with_values::( + "date >= (SELECT COALESCE(MAX(date), '1900-01-01'::date) FROM chart_data WHERE chart_id = $1 AND date <= $2)", + [chart_id.into(), from.into()], + ); + QuerySelect::query(&mut data_request).cond_where(custom_where); + } + if let Some(to) = to { + let custom_where = Expr::cust_with_values::( + "date <= (SELECT COALESCE(MIN(date), '9999-12-31'::date) FROM chart_data WHERE chart_id = $1 AND date >= $2)", + [chart_id.into(), to.into()], + ); + QuerySelect::query(&mut data_request).cond_where(custom_where); }; - data_request.into_model().all(db).await + + let data = data_request.into_model().all(db).await?; + Ok(data) } #[cfg(test)] mod tests { use super::*; use crate::{counters::TotalBlocks, tests::init_db::init_db, Chart}; + use chrono::DateTime; use entity::{chart_data, charts, sea_orm_active_enums::ChartType}; use pretty_assertions::assert_eq; use sea_orm::{EntityTrait, Set}; @@ -126,11 +200,17 @@ mod tests { charts::ActiveModel { name: Set(TotalBlocks::default().name().to_string()), chart_type: Set(ChartType::Counter), + last_updated_at: Set(Some( + DateTime::parse_from_rfc3339("2022-11-12T08:08:08+00:00").unwrap(), + )), ..Default::default() }, charts::ActiveModel { name: Set("newBlocksPerDay".into()), chart_type: Set(ChartType::Line), + last_updated_at: Set(Some( + DateTime::parse_from_rfc3339("2022-11-12T08:08:08+00:00").unwrap(), + )), ..Default::default() }, ]) @@ -178,22 +258,25 @@ mod tests { let db = init_db("get_chart_int_mock").await; insert_mock_data(&db).await; - let chart = get_chart_data(&db, "newBlocksPerDay", None, None, None) + let chart = get_chart_data(&db, "newBlocksPerDay", None, None, None, None, 1) .await .unwrap(); assert_eq!( vec![ - DateValue { + ExtendedDateValue { date: NaiveDate::from_str("2022-11-10").unwrap(), value: "100".into(), + is_approximate: false, }, - DateValue { + ExtendedDateValue { date: NaiveDate::from_str("2022-11-11").unwrap(), value: "150".into(), + is_approximate: false, }, - DateValue { + ExtendedDateValue { date: NaiveDate::from_str("2022-11-12").unwrap(), value: "200".into(), + is_approximate: true, }, ], chart diff --git a/stats/stats/src/charts/insert.rs b/stats/stats/src/charts/db_interaction/types.rs similarity index 66% rename from stats/stats/src/charts/insert.rs rename to stats/stats/src/charts/db_interaction/types.rs index 584dbfd97..966ade3ad 100644 --- a/stats/stats/src/charts/insert.rs +++ b/stats/stats/src/charts/db_interaction/types.rs @@ -1,8 +1,8 @@ use std::num::ParseIntError; -use chrono::{NaiveDate, Utc}; +use chrono::NaiveDate; use entity::chart_data; -use sea_orm::{prelude::*, sea_query, ConnectionTrait, FromQueryResult, Set}; +use sea_orm::{prelude::*, FromQueryResult, Set}; #[derive(FromQueryResult, Debug, Clone)] pub struct DateValueInt { @@ -89,39 +89,37 @@ impl DateValue { } } - pub fn relevant_or_zero(self) -> DateValue { - let today = Utc::now().date_naive(); - if self.date < today { - DateValue::zero(today) + pub fn relevant_or_zero(self, current_date: NaiveDate) -> DateValue { + if self.date < current_date { + DateValue::zero(current_date) } else { self } } +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct ExtendedDateValue { + pub date: NaiveDate, + pub value: String, + pub is_approximate: bool, +} - pub fn is_partial(&self) -> bool { - let today = Utc::now().date_naive(); - self.date >= today +impl ExtendedDateValue { + pub fn from_date_value(dv: DateValue, is_approximate: bool) -> Self { + Self { + date: dv.date, + value: dv.value, + is_approximate, + } } } -pub async fn insert_data_many(db: &C, data: D) -> Result<(), DbErr> -where - C: ConnectionTrait, - D: IntoIterator + Send + Sync, -{ - let mut data = data.into_iter().peekable(); - if data.peek().is_some() { - chart_data::Entity::insert_many(data) - .on_conflict( - sea_query::OnConflict::columns([ - chart_data::Column::ChartId, - chart_data::Column::Date, - ]) - .update_column(chart_data::Column::Value) - .to_owned(), - ) - .exec(db) - .await?; +impl From for DateValue { + fn from(dv: ExtendedDateValue) -> Self { + DateValue { + date: dv.date, + value: dv.value, + } } - Ok(()) } diff --git a/stats/stats/src/charts/db_interaction/write.rs b/stats/stats/src/charts/db_interaction/write.rs new file mode 100644 index 000000000..4ffe941b5 --- /dev/null +++ b/stats/stats/src/charts/db_interaction/write.rs @@ -0,0 +1,24 @@ +use entity::chart_data; +use sea_orm::{prelude::*, sea_query, ConnectionTrait}; + +pub async fn insert_data_many(db: &C, data: D) -> Result<(), DbErr> +where + C: ConnectionTrait, + D: IntoIterator + Send + Sync, +{ + let mut data = data.into_iter().peekable(); + if data.peek().is_some() { + chart_data::Entity::insert_many(data) + .on_conflict( + sea_query::OnConflict::columns([ + chart_data::Column::ChartId, + chart_data::Column::Date, + ]) + .update_column(chart_data::Column::Value) + .to_owned(), + ) + .exec(db) + .await?; + } + Ok(()) +} diff --git a/stats/stats/src/charts/lines/accounts_growth.rs b/stats/stats/src/charts/lines/accounts_growth.rs index a5817eb67..4fa582816 100644 --- a/stats/stats/src/charts/lines/accounts_growth.rs +++ b/stats/stats/src/charts/lines/accounts_growth.rs @@ -2,8 +2,10 @@ use super::NewAccounts; use crate::{ charts::{ cache::Cache, - insert::{DateValue, DateValueInt}, - updater::ChartFullUpdater, + db_interaction::{ + chart_updaters::{ChartFullUpdater, ChartUpdater}, + types::{DateValue, DateValueInt}, + }, }, MissingDatePolicy, UpdateError, }; @@ -62,17 +64,19 @@ impl crate::Chart for AccountsGrowth { fn missing_date_policy(&self) -> MissingDatePolicy { MissingDatePolicy::FillPrevious } - fn drop_last_point(&self) -> bool { - false - } +} - async fn update( +#[async_trait] +impl ChartUpdater for AccountsGrowth { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/active_accounts.rs b/stats/stats/src/charts/lines/active_accounts.rs index 144ff6254..b0364a928 100644 --- a/stats/stats/src/charts/lines/active_accounts.rs +++ b/stats/stats/src/charts/lines/active_accounts.rs @@ -1,5 +1,8 @@ use crate::{ - charts::{insert::DateValue, updater::ChartPartialUpdater}, + charts::db_interaction::{ + chart_updaters::{ChartPartialUpdater, ChartUpdater}, + types::DateValue, + }, UpdateError, }; use async_trait::async_trait; @@ -14,9 +17,9 @@ impl ChartPartialUpdater for ActiveAccounts { async fn get_values( &self, blockscout: &DatabaseConnection, - last_row: Option, + last_updated_row: Option, ) -> Result, UpdateError> { - let stmnt = match last_row { + let stmnt = match last_updated_row { Some(row) => Statement::from_sql_and_values( DbBackend::Postgres, r#" @@ -66,14 +69,19 @@ impl crate::Chart for ActiveAccounts { fn chart_type(&self) -> ChartType { ChartType::Line } +} - async fn update( +#[async_trait] +impl ChartUpdater for ActiveAccounts { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/average_block_rewards.rs b/stats/stats/src/charts/lines/average_block_rewards.rs index fdc547499..571a02ebc 100644 --- a/stats/stats/src/charts/lines/average_block_rewards.rs +++ b/stats/stats/src/charts/lines/average_block_rewards.rs @@ -1,7 +1,7 @@ use crate::{ - charts::{ - insert::{DateValue, DateValueDouble}, - updater::ChartPartialUpdater, + charts::db_interaction::{ + chart_updaters::{ChartPartialUpdater, ChartUpdater}, + types::{DateValue, DateValueDouble}, }, UpdateError, }; @@ -19,9 +19,9 @@ impl ChartPartialUpdater for AverageBlockRewards { async fn get_values( &self, blockscout: &DatabaseConnection, - last_row: Option, + last_updated_row: Option, ) -> Result, UpdateError> { - let stmnt = match last_row { + let stmnt = match last_updated_row { Some(row) => Statement::from_sql_and_values( DbBackend::Postgres, r#" @@ -73,14 +73,19 @@ impl crate::Chart for AverageBlockRewards { fn chart_type(&self) -> ChartType { ChartType::Line } +} - async fn update( +#[async_trait] +impl ChartUpdater for AverageBlockRewards { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/average_block_size.rs b/stats/stats/src/charts/lines/average_block_size.rs index 152863814..f47c72721 100644 --- a/stats/stats/src/charts/lines/average_block_size.rs +++ b/stats/stats/src/charts/lines/average_block_size.rs @@ -1,5 +1,8 @@ use crate::{ - charts::{insert::DateValue, updater::ChartPartialUpdater}, + charts::db_interaction::{ + chart_updaters::{ChartPartialUpdater, ChartUpdater}, + types::DateValue, + }, UpdateError, }; use async_trait::async_trait; @@ -14,9 +17,9 @@ impl ChartPartialUpdater for AverageBlockSize { async fn get_values( &self, blockscout: &DatabaseConnection, - last_row: Option, + last_updated_row: Option, ) -> Result, UpdateError> { - let stmnt = match last_row { + let stmnt = match last_updated_row { Some(row) => Statement::from_sql_and_values( DbBackend::Postgres, r#" @@ -65,14 +68,19 @@ impl crate::Chart for AverageBlockSize { fn chart_type(&self) -> ChartType { ChartType::Line } +} - async fn update( +#[async_trait] +impl ChartUpdater for AverageBlockSize { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/average_gas_limit.rs b/stats/stats/src/charts/lines/average_gas_limit.rs index a28eb64da..a266cad5f 100644 --- a/stats/stats/src/charts/lines/average_gas_limit.rs +++ b/stats/stats/src/charts/lines/average_gas_limit.rs @@ -1,5 +1,8 @@ use crate::{ - charts::{insert::DateValue, updater::ChartPartialUpdater}, + charts::db_interaction::{ + chart_updaters::{ChartPartialUpdater, ChartUpdater}, + types::DateValue, + }, UpdateError, }; use async_trait::async_trait; @@ -14,9 +17,9 @@ impl ChartPartialUpdater for AverageGasLimit { async fn get_values( &self, blockscout: &DatabaseConnection, - last_row: Option, + last_updated_row: Option, ) -> Result, UpdateError> { - let stmnt = match last_row { + let stmnt = match last_updated_row { Some(row) => Statement::from_sql_and_values( DbBackend::Postgres, r#" @@ -65,14 +68,19 @@ impl crate::Chart for AverageGasLimit { fn chart_type(&self) -> ChartType { ChartType::Line } +} - async fn update( +#[async_trait] +impl ChartUpdater for AverageGasLimit { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/average_gas_price.rs b/stats/stats/src/charts/lines/average_gas_price.rs index bbb1fe059..763cbc48f 100644 --- a/stats/stats/src/charts/lines/average_gas_price.rs +++ b/stats/stats/src/charts/lines/average_gas_price.rs @@ -1,7 +1,7 @@ use crate::{ - charts::{ - insert::{DateValue, DateValueDouble}, - updater::ChartPartialUpdater, + charts::db_interaction::{ + chart_updaters::{ChartPartialUpdater, ChartUpdater}, + types::{DateValue, DateValueDouble}, }, UpdateError, }; @@ -19,9 +19,9 @@ impl ChartPartialUpdater for AverageGasPrice { async fn get_values( &self, blockscout: &DatabaseConnection, - last_row: Option, + last_updated_row: Option, ) -> Result, UpdateError> { - let stmnt = match last_row { + let stmnt = match last_updated_row { Some(row) => Statement::from_sql_and_values( DbBackend::Postgres, r#" @@ -88,14 +88,19 @@ impl crate::Chart for AverageGasPrice { fn chart_type(&self) -> ChartType { ChartType::Line } +} - async fn update( +#[async_trait] +impl ChartUpdater for AverageGasPrice { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/average_txn_fee.rs b/stats/stats/src/charts/lines/average_txn_fee.rs index 79cda8f1d..4bcb3c991 100644 --- a/stats/stats/src/charts/lines/average_txn_fee.rs +++ b/stats/stats/src/charts/lines/average_txn_fee.rs @@ -1,7 +1,7 @@ use crate::{ - charts::{ - insert::{DateValue, DateValueDouble}, - updater::ChartPartialUpdater, + charts::db_interaction::{ + chart_updaters::{ChartPartialUpdater, ChartUpdater}, + types::{DateValue, DateValueDouble}, }, UpdateError, }; @@ -19,9 +19,9 @@ impl ChartPartialUpdater for AverageTxnFee { async fn get_values( &self, blockscout: &DatabaseConnection, - last_row: Option, + last_updated_row: Option, ) -> Result, UpdateError> { - let stmnt = match last_row { + let stmnt = match last_updated_row { Some(row) => Statement::from_sql_and_values( DbBackend::Postgres, r#" @@ -75,14 +75,19 @@ impl crate::Chart for AverageTxnFee { fn chart_type(&self) -> ChartType { ChartType::Line } +} - async fn update( +#[async_trait] +impl ChartUpdater for AverageTxnFee { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/contracts_growth.rs b/stats/stats/src/charts/lines/contracts_growth.rs index 3e65dd1d1..f2f131e6c 100644 --- a/stats/stats/src/charts/lines/contracts_growth.rs +++ b/stats/stats/src/charts/lines/contracts_growth.rs @@ -3,8 +3,10 @@ use crate::{ charts::{ chart::Chart, create_chart, - insert::DateValue, - updater::{parse_and_growth, ChartDependentUpdater}, + db_interaction::{ + chart_updaters::{parse_and_cumsum, ChartDependentUpdater, ChartUpdater}, + types::DateValue, + }, }, MissingDatePolicy, UpdateError, }; @@ -31,7 +33,7 @@ impl ChartDependentUpdater for ContractsGrowth { } async fn get_values(&self, parent_data: Vec) -> Result, UpdateError> { - parse_and_growth::(parent_data, self.parent.name()) + parse_and_cumsum::(parent_data, self.parent.name()) } } @@ -46,23 +48,24 @@ impl crate::Chart for ContractsGrowth { fn missing_date_policy(&self) -> MissingDatePolicy { MissingDatePolicy::FillPrevious } - fn drop_last_point(&self) -> bool { - false - } async fn create(&self, db: &DatabaseConnection) -> Result<(), DbErr> { self.parent.create(db).await?; create_chart(db, self.name().into(), self.chart_type()).await } +} - async fn update( +#[async_trait] +impl ChartUpdater for ContractsGrowth { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await?; - Ok(()) + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/gas_used_growth.rs b/stats/stats/src/charts/lines/gas_used_growth.rs index ef5f6d676..0fd4a1123 100644 --- a/stats/stats/src/charts/lines/gas_used_growth.rs +++ b/stats/stats/src/charts/lines/gas_used_growth.rs @@ -1,7 +1,7 @@ use crate::{ - charts::{ - insert::{DateValue, DateValueDecimal}, - updater::ChartPartialUpdater, + charts::db_interaction::{ + chart_updaters::{ChartPartialUpdater, ChartUpdater}, + types::{DateValue, DateValueDecimal}, }, MissingDatePolicy, UpdateError, }; @@ -17,9 +17,9 @@ impl ChartPartialUpdater for GasUsedGrowth { async fn get_values( &self, blockscout: &DatabaseConnection, - last_row: Option, + last_updated_row: Option, ) -> Result, UpdateError> { - let data = match last_row { + let data = match last_updated_row { Some(row) => { let last_value = Decimal::from_str_exact(&row.value).map_err(|e| { UpdateError::Internal(format!("failed to parse previous value: {e}")) @@ -90,17 +90,19 @@ impl crate::Chart for GasUsedGrowth { fn missing_date_policy(&self) -> MissingDatePolicy { MissingDatePolicy::FillPrevious } - fn drop_last_point(&self) -> bool { - false - } +} - async fn update( +#[async_trait] +impl ChartUpdater for GasUsedGrowth { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/mock.rs b/stats/stats/src/charts/lines/mock.rs index 34b93ee37..2318111f3 100644 --- a/stats/stats/src/charts/lines/mock.rs +++ b/stats/stats/src/charts/lines/mock.rs @@ -1,5 +1,8 @@ use crate::{ - charts::{insert::DateValue, updater::ChartFullUpdater}, + charts::db_interaction::{ + chart_updaters::{ChartFullUpdater, ChartUpdater}, + types::DateValue, + }, UpdateError, }; use async_trait::async_trait; @@ -9,10 +12,9 @@ use rand::{distributions::uniform::SampleUniform, rngs::StdRng, Rng, SeedableRng use sea_orm::prelude::*; use std::{ops::Range, str::FromStr}; -fn generate_intervals(mut start: NaiveDate) -> Vec { - let now = chrono::offset::Utc::now().naive_utc().date(); +fn generate_intervals(mut start: NaiveDate, end: NaiveDate) -> Vec { let mut times = vec![]; - while start < now { + while start < end { times.push(start); start += Duration::days(1); } @@ -23,17 +25,20 @@ pub fn mocked_lines( range: Range, ) -> Vec { let mut rng = StdRng::seed_from_u64(222); - generate_intervals(NaiveDate::from_str("2022-01-01").unwrap()) - .into_iter() - .map(|date| { - let range = range.clone(); - let value = rng.gen_range(range); - DateValue { - date, - value: value.to_string(), - } - }) - .collect() + generate_intervals( + NaiveDate::from_str("2022-01-01").unwrap(), + NaiveDate::from_str("2022-04-01").unwrap(), + ) + .into_iter() + .map(|date| { + let range = range.clone(); + let value = rng.gen_range(range); + DateValue { + date, + value: value.to_string(), + } + }) + .collect() } #[derive(Debug)] @@ -71,13 +76,20 @@ impl c fn chart_type(&self) -> ChartType { ChartType::Line } +} - async fn update( +#[async_trait] +impl ChartUpdater + for MockLine +{ + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/native_coin_holders_growth.rs b/stats/stats/src/charts/lines/native_coin_holders_growth.rs index c1844199e..2ad31bf62 100644 --- a/stats/stats/src/charts/lines/native_coin_holders_growth.rs +++ b/stats/stats/src/charts/lines/native_coin_holders_growth.rs @@ -1,8 +1,15 @@ use crate::{ charts::{ - create_chart, find_chart, - insert::{insert_data_many, DateValue}, - updater::{get_last_row, get_min_block_blockscout}, + create_chart, + db_interaction::{ + chart_updaters::{ + common_operations::{get_min_block_blockscout, get_nth_last_row}, + ChartUpdater, + }, + types::DateValue, + write::insert_data_many, + }, + find_chart, }, Chart, MissingDatePolicy, UpdateError, }; @@ -50,19 +57,24 @@ impl crate::Chart for NativeCoinHoldersGrowth { fn missing_date_policy(&self) -> MissingDatePolicy { MissingDatePolicy::FillPrevious } - fn drop_last_point(&self) -> bool { - true + fn approximate_trailing_points(&self) -> u64 { + // support table contains information of actual last day + 0 } async fn create(&self, db: &DatabaseConnection) -> Result<(), DbErr> { self.create_support_table(db).await?; create_chart(db, self.name().into(), self.chart_type()).await } +} - async fn update( +#[async_trait] +impl ChartUpdater for NativeCoinHoldersGrowth { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + _current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { let chart_id = find_chart(db, self.name()) @@ -72,11 +84,9 @@ impl crate::Chart for NativeCoinHoldersGrowth { let min_blockscout_block = get_min_block_blockscout(blockscout) .await .map_err(UpdateError::BlockscoutDB)?; - // settings offset to zero to get actual last row, - // because support table contains information of actual last day - let offset = Some(0); + let offset = Some(self.approximate_trailing_points()); let last_row = - get_last_row(self, chart_id, min_blockscout_block, db, force_full, offset).await?; + get_nth_last_row(self, chart_id, min_blockscout_block, db, force_full, offset).await?; self.update_sequentially_with_support_table( db, blockscout, @@ -121,7 +131,7 @@ impl NativeCoinHoldersGrowth { len = days.len(), first = ?first, last = ?last, - "start fethcing data for days" + "start fetching data for days" ); // NOTE: we update support table and chart data in one transaction // to support invariant that support table has information about last day in chart data diff --git a/stats/stats/src/charts/lines/native_coin_supply.rs b/stats/stats/src/charts/lines/native_coin_supply.rs index 86fbdfdde..1ada0dac5 100644 --- a/stats/stats/src/charts/lines/native_coin_supply.rs +++ b/stats/stats/src/charts/lines/native_coin_supply.rs @@ -1,7 +1,7 @@ use crate::{ - charts::{ - insert::{DateValue, DateValueDouble}, - updater::ChartPartialUpdater, + charts::db_interaction::{ + chart_updaters::{ChartPartialUpdater, ChartUpdater}, + types::{DateValue, DateValueDouble}, }, UpdateError, }; @@ -19,9 +19,9 @@ impl ChartPartialUpdater for NativeCoinSupply { async fn get_values( &self, blockscout: &DatabaseConnection, - last_row: Option, + last_updated_row: Option, ) -> Result, UpdateError> { - let stmnt = match last_row { + let stmnt = match last_updated_row { Some(row) => Statement::from_sql_and_values( DbBackend::Postgres, r" @@ -84,14 +84,19 @@ impl crate::Chart for NativeCoinSupply { fn chart_type(&self) -> ChartType { ChartType::Line } +} - async fn update( +#[async_trait] +impl ChartUpdater for NativeCoinSupply { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/new_accounts.rs b/stats/stats/src/charts/lines/new_accounts.rs index 4bd8d736f..6cbed318c 100644 --- a/stats/stats/src/charts/lines/new_accounts.rs +++ b/stats/stats/src/charts/lines/new_accounts.rs @@ -1,8 +1,10 @@ use crate::{ charts::{ cache::Cache, - insert::{DateValue, DateValueInt}, - updater::ChartFullUpdater, + db_interaction::{ + chart_updaters::{ChartFullUpdater, ChartUpdater}, + types::{DateValue, DateValueInt}, + }, }, UpdateError, }; @@ -79,14 +81,19 @@ impl crate::Chart for NewAccounts { fn chart_type(&self) -> ChartType { ChartType::Line } +} - async fn update( +#[async_trait] +impl ChartUpdater for NewAccounts { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/new_blocks.rs b/stats/stats/src/charts/lines/new_blocks.rs index bdc454467..10bb5fb03 100644 --- a/stats/stats/src/charts/lines/new_blocks.rs +++ b/stats/stats/src/charts/lines/new_blocks.rs @@ -1,5 +1,8 @@ use crate::{ - charts::{insert::DateValue, updater::ChartPartialUpdater}, + charts::db_interaction::{ + chart_updaters::{ChartPartialUpdater, ChartUpdater}, + types::DateValue, + }, UpdateError, }; use async_trait::async_trait; @@ -14,9 +17,9 @@ impl ChartPartialUpdater for NewBlocks { async fn get_values( &self, blockscout: &DatabaseConnection, - last_row: Option, + last_updated_row: Option, ) -> Result, UpdateError> { - let stmnt = match last_row { + let stmnt = match last_updated_row { Some(row) => Statement::from_sql_and_values( DbBackend::Postgres, r#" @@ -60,14 +63,19 @@ impl crate::Chart for NewBlocks { fn chart_type(&self) -> ChartType { ChartType::Line } +} - async fn update( +#[async_trait] +impl ChartUpdater for NewBlocks { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } @@ -75,10 +83,10 @@ impl crate::Chart for NewBlocks { mod tests { use super::*; use crate::{ - charts::updater::get_min_block_blockscout, + charts::db_interaction::chart_updaters::common_operations::get_min_block_blockscout, get_chart_data, tests::{init_db::init_db_all, mock_blockscout::fill_mock_blockscout_data}, - Chart, + Chart, ExtendedDateValue, }; use chrono::NaiveDate; use entity::chart_data; @@ -91,7 +99,9 @@ mod tests { async fn update_new_blocks_recurrent() { let _ = tracing_subscriber::fmt::try_init(); let (db, blockscout) = init_db_all("update_new_blocks_recurrent").await; - fill_mock_blockscout_data(&blockscout, "2022-11-12").await; + let current_time = chrono::DateTime::from_str("2022-11-12T12:00:00Z").unwrap(); + let current_date = current_time.date_naive(); + fill_mock_blockscout_data(&blockscout, current_date).await; let updater = NewBlocks::default(); updater.create(&db).await.unwrap(); @@ -119,47 +129,60 @@ mod tests { .unwrap(); // Note that update is not full, therefore there is no entry with date `2022-11-09` - updater.update(&db, &blockscout, false).await.unwrap(); - let data = get_chart_data(&db, updater.name(), None, None, None) + updater + .update(&db, &blockscout, current_time, false) + .await + .unwrap(); + let data = get_chart_data(&db, updater.name(), None, None, None, None, 1) .await .unwrap(); let expected = vec![ - DateValue { + ExtendedDateValue { date: NaiveDate::from_str("2022-11-10").unwrap(), value: "3".into(), + is_approximate: false, }, - DateValue { + ExtendedDateValue { date: NaiveDate::from_str("2022-11-11").unwrap(), value: "4".into(), + is_approximate: false, }, - DateValue { + ExtendedDateValue { date: NaiveDate::from_str("2022-11-12").unwrap(), value: "1".into(), + is_approximate: true, }, ]; assert_eq!(expected, data); // note that update is full, therefore there is entry with date `2022-11-09` - updater.update(&db, &blockscout, true).await.unwrap(); - let data = get_chart_data(&db, updater.name(), None, None, None) + updater + .update(&db, &blockscout, current_time, true) + .await + .unwrap(); + let data = get_chart_data(&db, updater.name(), None, None, None, None, 1) .await .unwrap(); let expected = vec![ - DateValue { + ExtendedDateValue { date: NaiveDate::from_str("2022-11-09").unwrap(), value: "1".into(), + is_approximate: false, }, - DateValue { + ExtendedDateValue { date: NaiveDate::from_str("2022-11-10").unwrap(), value: "3".into(), + is_approximate: false, }, - DateValue { + ExtendedDateValue { date: NaiveDate::from_str("2022-11-11").unwrap(), value: "4".into(), + is_approximate: false, }, - DateValue { + ExtendedDateValue { date: NaiveDate::from_str("2022-11-12").unwrap(), value: "1".into(), + is_approximate: true, }, ]; assert_eq!(expected, data); @@ -170,31 +193,40 @@ mod tests { async fn update_new_blocks_fresh() { let _ = tracing_subscriber::fmt::try_init(); let (db, blockscout) = init_db_all("update_new_blocks_fresh").await; - fill_mock_blockscout_data(&blockscout, "2022-11-12").await; + let current_time = chrono::DateTime::from_str("2022-11-12T12:00:00Z").unwrap(); + let current_date = current_time.date_naive(); + fill_mock_blockscout_data(&blockscout, current_date).await; let updater = NewBlocks::default(); updater.create(&db).await.unwrap(); - updater.update(&db, &blockscout, true).await.unwrap(); - let data = get_chart_data(&db, updater.name(), None, None, None) + updater + .update(&db, &blockscout, current_time, true) + .await + .unwrap(); + let data = get_chart_data(&db, updater.name(), None, None, None, None, 0) .await .unwrap(); let expected = vec![ - DateValue { + ExtendedDateValue { date: NaiveDate::from_str("2022-11-09").unwrap(), value: "1".into(), + is_approximate: false, }, - DateValue { + ExtendedDateValue { date: NaiveDate::from_str("2022-11-10").unwrap(), value: "3".into(), + is_approximate: false, }, - DateValue { + ExtendedDateValue { date: NaiveDate::from_str("2022-11-11").unwrap(), value: "4".into(), + is_approximate: false, }, - DateValue { + ExtendedDateValue { date: NaiveDate::from_str("2022-11-12").unwrap(), value: "1".into(), + is_approximate: false, }, ]; assert_eq!(expected, data); @@ -205,7 +237,9 @@ mod tests { async fn update_new_blocks_last() { let _ = tracing_subscriber::fmt::try_init(); let (db, blockscout) = init_db_all("update_new_blocks_last").await; - fill_mock_blockscout_data(&blockscout, "2022-11-12").await; + let current_time = chrono::DateTime::from_str("2022-11-12T12:00:00Z").unwrap(); + let current_date = current_time.date_naive(); + fill_mock_blockscout_data(&blockscout, current_date).await; let updater = NewBlocks::default(); updater.create(&db).await.unwrap(); @@ -247,26 +281,33 @@ mod tests { .await .unwrap(); - updater.update(&db, &blockscout, false).await.unwrap(); - let data = get_chart_data(&db, updater.name(), None, None, None) + updater + .update(&db, &blockscout, current_time, false) + .await + .unwrap(); + let data = get_chart_data(&db, updater.name(), None, None, None, None, 1) .await .unwrap(); let expected = vec![ - DateValue { + ExtendedDateValue { date: NaiveDate::from_str("2022-11-09").unwrap(), value: "2".into(), + is_approximate: false, }, - DateValue { + ExtendedDateValue { date: NaiveDate::from_str("2022-11-10").unwrap(), value: "4".into(), + is_approximate: false, }, - DateValue { + ExtendedDateValue { date: NaiveDate::from_str("2022-11-11").unwrap(), value: "5".into(), + is_approximate: false, }, - DateValue { + ExtendedDateValue { date: NaiveDate::from_str("2022-11-12").unwrap(), value: "1".into(), + is_approximate: true, }, ]; assert_eq!(expected, data); diff --git a/stats/stats/src/charts/lines/new_contracts.rs b/stats/stats/src/charts/lines/new_contracts.rs index 40694abf6..d38df878a 100644 --- a/stats/stats/src/charts/lines/new_contracts.rs +++ b/stats/stats/src/charts/lines/new_contracts.rs @@ -1,4 +1,7 @@ -use crate::{charts::updater::ChartBatchUpdater, UpdateError}; +use crate::{ + charts::db_interaction::chart_updaters::{ChartBatchUpdater, ChartUpdater}, + UpdateError, +}; use async_trait::async_trait; use chrono::NaiveDate; use entity::sea_orm_active_enums::ChartType; @@ -59,14 +62,19 @@ impl crate::Chart for NewContracts { fn chart_type(&self) -> ChartType { ChartType::Line } +} - async fn update( +#[async_trait] +impl ChartUpdater for NewContracts { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/new_native_coin_holders.rs b/stats/stats/src/charts/lines/new_native_coin_holders.rs index 1a3a1eb81..5db86e70a 100644 --- a/stats/stats/src/charts/lines/new_native_coin_holders.rs +++ b/stats/stats/src/charts/lines/new_native_coin_holders.rs @@ -2,8 +2,10 @@ use super::NativeCoinHoldersGrowth; use crate::{ charts::{ create_chart, - insert::{DateValue, DateValueInt}, - updater::ChartDependentUpdater, + db_interaction::{ + chart_updaters::{ChartDependentUpdater, ChartUpdater}, + types::{DateValue, DateValueInt}, + }, Chart, }, UpdateError, @@ -70,14 +72,19 @@ impl Chart for NewNativeCoinHolders { self.parent.create(db).await?; create_chart(db, self.name().into(), self.chart_type()).await } +} - async fn update( +#[async_trait] +impl ChartUpdater for NewNativeCoinHolders { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/new_native_coin_transfers.rs b/stats/stats/src/charts/lines/new_native_coin_transfers.rs index fc3ca7540..98c1ed031 100644 --- a/stats/stats/src/charts/lines/new_native_coin_transfers.rs +++ b/stats/stats/src/charts/lines/new_native_coin_transfers.rs @@ -1,5 +1,8 @@ use crate::{ - charts::{insert::DateValue, updater::ChartPartialUpdater}, + charts::db_interaction::{ + chart_updaters::{ChartPartialUpdater, ChartUpdater}, + types::DateValue, + }, UpdateError, }; use async_trait::async_trait; @@ -14,9 +17,9 @@ impl ChartPartialUpdater for NewNativeCoinTransfers { async fn get_values( &self, blockscout: &DatabaseConnection, - last_row: Option, + last_updated_row: Option, ) -> Result, UpdateError> { - let stmnt = match last_row { + let stmnt = match last_updated_row { Some(row) => Statement::from_sql_and_values( DbBackend::Postgres, r#" @@ -71,14 +74,19 @@ impl crate::Chart for NewNativeCoinTransfers { fn chart_type(&self) -> ChartType { ChartType::Line } +} - async fn update( +#[async_trait] +impl ChartUpdater for NewNativeCoinTransfers { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, - full: bool, + current_time: chrono::DateTime, + force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/new_txns.rs b/stats/stats/src/charts/lines/new_txns.rs index eba0d2eda..636c31980 100644 --- a/stats/stats/src/charts/lines/new_txns.rs +++ b/stats/stats/src/charts/lines/new_txns.rs @@ -1,5 +1,8 @@ use crate::{ - charts::{insert::DateValue, updater::ChartPartialUpdater}, + charts::db_interaction::{ + chart_updaters::{ChartPartialUpdater, ChartUpdater}, + types::DateValue, + }, UpdateError, }; use async_trait::async_trait; @@ -14,9 +17,9 @@ impl ChartPartialUpdater for NewTxns { async fn get_values( &self, blockscout: &DatabaseConnection, - last_row: Option, + last_updated_row: Option, ) -> Result, UpdateError> { - let stmnt = match last_row { + let stmnt = match last_updated_row { Some(row) => Statement::from_sql_and_values( DbBackend::Postgres, r#" @@ -67,14 +70,19 @@ impl crate::Chart for NewTxns { fn chart_type(&self) -> ChartType { ChartType::Line } +} - async fn update( +#[async_trait] +impl ChartUpdater for NewTxns { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/new_verified_contracts.rs b/stats/stats/src/charts/lines/new_verified_contracts.rs index 7052839b0..17b0a85de 100644 --- a/stats/stats/src/charts/lines/new_verified_contracts.rs +++ b/stats/stats/src/charts/lines/new_verified_contracts.rs @@ -1,8 +1,12 @@ use crate::{ - charts::{insert::DateValue, updater::ChartPartialUpdater}, + charts::db_interaction::{ + chart_updaters::{ChartPartialUpdater, ChartUpdater}, + types::DateValue, + }, UpdateError, }; use async_trait::async_trait; +use chrono::Utc; use entity::sea_orm_active_enums::ChartType; use sea_orm::{prelude::*, DbBackend, FromQueryResult, Statement}; @@ -14,9 +18,9 @@ impl ChartPartialUpdater for NewVerifiedContracts { async fn get_values( &self, blockscout: &DatabaseConnection, - last_row: Option, + last_updated_row: Option, ) -> Result, UpdateError> { - let stmnt = match last_row { + let stmnt = match last_updated_row { Some(row) => Statement::from_sql_and_values( DbBackend::Postgres, r#"SELECT @@ -55,14 +59,19 @@ impl crate::Chart for NewVerifiedContracts { fn chart_type(&self) -> ChartType { ChartType::Line } +} - async fn update( +#[async_trait] +impl ChartUpdater for NewVerifiedContracts { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/txns_fee.rs b/stats/stats/src/charts/lines/txns_fee.rs index 6107112f8..fa0c5476d 100644 --- a/stats/stats/src/charts/lines/txns_fee.rs +++ b/stats/stats/src/charts/lines/txns_fee.rs @@ -1,7 +1,7 @@ use crate::{ - charts::{ - insert::{DateValue, DateValueDouble}, - updater::ChartPartialUpdater, + charts::db_interaction::{ + chart_updaters::{ChartPartialUpdater, ChartUpdater}, + types::{DateValue, DateValueDouble}, }, UpdateError, }; @@ -19,9 +19,9 @@ impl ChartPartialUpdater for TxnsFee { async fn get_values( &self, blockscout: &DatabaseConnection, - last_row: Option, + last_updated_row: Option, ) -> Result, UpdateError> { - let stmnt = match last_row { + let stmnt = match last_updated_row { Some(row) => Statement::from_sql_and_values( DbBackend::Postgres, r#" @@ -75,14 +75,19 @@ impl crate::Chart for TxnsFee { fn chart_type(&self) -> ChartType { ChartType::Line } +} - async fn update( +#[async_trait] +impl ChartUpdater for TxnsFee { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/txns_growth.rs b/stats/stats/src/charts/lines/txns_growth.rs index abc1323a3..5e17a1d0f 100644 --- a/stats/stats/src/charts/lines/txns_growth.rs +++ b/stats/stats/src/charts/lines/txns_growth.rs @@ -3,8 +3,10 @@ use crate::{ charts::{ chart::Chart, create_chart, - insert::DateValue, - updater::{parse_and_growth, ChartDependentUpdater}, + db_interaction::{ + chart_updaters::{parse_and_cumsum, ChartDependentUpdater, ChartUpdater}, + types::DateValue, + }, }, MissingDatePolicy, UpdateError, }; @@ -31,7 +33,7 @@ impl ChartDependentUpdater for TxnsGrowth { } async fn get_values(&self, parent_data: Vec) -> Result, UpdateError> { - parse_and_growth::(parent_data, self.parent.name()) + parse_and_cumsum::(parent_data, self.parent.name()) } } @@ -46,23 +48,24 @@ impl crate::Chart for TxnsGrowth { fn missing_date_policy(&self) -> MissingDatePolicy { MissingDatePolicy::FillPrevious } - fn drop_last_point(&self) -> bool { - false - } async fn create(&self, db: &DatabaseConnection) -> Result<(), DbErr> { self.parent.create(db).await?; create_chart(db, self.name().into(), self.chart_type()).await } +} - async fn update( +#[async_trait] +impl ChartUpdater for TxnsGrowth { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await?; - Ok(()) + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/txns_success_rate.rs b/stats/stats/src/charts/lines/txns_success_rate.rs index 102336650..f57c690cc 100644 --- a/stats/stats/src/charts/lines/txns_success_rate.rs +++ b/stats/stats/src/charts/lines/txns_success_rate.rs @@ -1,7 +1,7 @@ use crate::{ - charts::{ - insert::{DateValue, DateValueDouble}, - updater::ChartPartialUpdater, + charts::db_interaction::{ + chart_updaters::{ChartPartialUpdater, ChartUpdater}, + types::{DateValue, DateValueDouble}, }, UpdateError, }; @@ -17,9 +17,9 @@ impl ChartPartialUpdater for TxnsSuccessRate { async fn get_values( &self, blockscout: &DatabaseConnection, - last_row: Option, + last_updated_row: Option, ) -> Result, UpdateError> { - let stmnt = match last_row { + let stmnt = match last_updated_row { Some(row) => Statement::from_sql_and_values( DbBackend::Postgres, r#" @@ -79,14 +79,19 @@ impl crate::Chart for TxnsSuccessRate { fn chart_type(&self) -> ChartType { ChartType::Line } +} - async fn update( +#[async_trait] +impl ChartUpdater for TxnsSuccessRate { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/lines/verified_contracts_growth.rs b/stats/stats/src/charts/lines/verified_contracts_growth.rs index 15b2be20e..080d7b35d 100644 --- a/stats/stats/src/charts/lines/verified_contracts_growth.rs +++ b/stats/stats/src/charts/lines/verified_contracts_growth.rs @@ -3,8 +3,10 @@ use crate::{ charts::{ chart::Chart, create_chart, - insert::DateValue, - updater::{parse_and_growth, ChartDependentUpdater}, + db_interaction::{ + chart_updaters::{parse_and_cumsum, ChartDependentUpdater, ChartUpdater}, + types::DateValue, + }, }, MissingDatePolicy, UpdateError, }; @@ -31,7 +33,7 @@ impl ChartDependentUpdater for VerifiedContractsGrowth { } async fn get_values(&self, parent_data: Vec) -> Result, UpdateError> { - parse_and_growth::(parent_data, self.parent.name()) + parse_and_cumsum::(parent_data, self.parent.name()) } } @@ -46,23 +48,24 @@ impl crate::Chart for VerifiedContractsGrowth { fn missing_date_policy(&self) -> MissingDatePolicy { MissingDatePolicy::FillPrevious } - fn drop_last_point(&self) -> bool { - false - } async fn create(&self, db: &DatabaseConnection) -> Result<(), DbErr> { self.parent.create(db).await?; create_chart(db, self.name().into(), self.chart_type()).await } +} - async fn update( +#[async_trait] +impl ChartUpdater for VerifiedContractsGrowth { + async fn update_values( &self, db: &DatabaseConnection, blockscout: &DatabaseConnection, + current_time: chrono::DateTime, force_full: bool, ) -> Result<(), UpdateError> { - self.update_with_values(db, blockscout, force_full).await?; - Ok(()) + self.update_with_values(db, blockscout, current_time, force_full) + .await } } diff --git a/stats/stats/src/charts/mod.rs b/stats/stats/src/charts/mod.rs index 335cb6f0c..268ea5db3 100644 --- a/stats/stats/src/charts/mod.rs +++ b/stats/stats/src/charts/mod.rs @@ -1,9 +1,8 @@ pub mod cache; mod chart; pub mod counters; -pub mod insert; +pub mod db_interaction; pub mod lines; mod mutex; -pub mod updater; - pub use chart::{create_chart, find_chart, Chart, MissingDatePolicy, UpdateError}; +pub use db_interaction::chart_updaters::ChartUpdater; diff --git a/stats/stats/src/lib.rs b/stats/stats/src/lib.rs index 6977428ad..468100cfc 100644 --- a/stats/stats/src/lib.rs +++ b/stats/stats/src/lib.rs @@ -1,6 +1,5 @@ mod charts; mod missing_date; -mod read; pub mod metrics; #[cfg(feature = "test-utils")] @@ -10,6 +9,10 @@ pub use entity; pub use migration; pub use charts::{ - cache, counters, insert::DateValue, lines, Chart, MissingDatePolicy, UpdateError, + cache, counters, + db_interaction::{ + read::{get_chart_data, get_counters, ReadError}, + types::{DateValue, ExtendedDateValue}, + }, + lines, Chart, ChartUpdater, MissingDatePolicy, UpdateError, }; -pub use read::{get_chart_data, get_counters, ReadError}; diff --git a/stats/stats/src/missing_date.rs b/stats/stats/src/missing_date.rs index e4c4cd7ca..b64504546 100644 --- a/stats/stats/src/missing_date.rs +++ b/stats/stats/src/missing_date.rs @@ -1,49 +1,44 @@ -use crate::{DateValue, MissingDatePolicy}; +use crate::{DateValue, MissingDatePolicy, ReadError}; use chrono::{Duration, NaiveDate}; -use entity::chart_data; -use sea_orm::{prelude::*, sea_query::Expr, QueryOrder, QuerySelect}; -pub async fn get_and_fill_chart( - db: &DatabaseConnection, - chart_id: i32, +/// Fills missing points according to policy and filters out points outside of range. +/// +/// Note that values outside of the range can still affect the filled values. +pub fn fill_and_filter_chart( + data: Vec, from: Option, to: Option, policy: MissingDatePolicy, -) -> Result, DbErr> { - let mut data_request = chart_data::Entity::find() - .select_only() - .column(chart_data::Column::Date) - .column(chart_data::Column::Value) - .filter(chart_data::Column::ChartId.eq(chart_id)) - .order_by_asc(chart_data::Column::Date); - - if let Some(from) = from { - let custom_where = Expr::cust_with_values::( - "date >= (SELECT COALESCE(MAX(date), '1900-01-01'::date) FROM chart_data WHERE chart_id = $1 AND date <= $2)", - [chart_id.into(), from.into()], - ); - QuerySelect::query(&mut data_request).cond_where(custom_where); + interval_limit: Option, +) -> Result, ReadError> { + let retrieved_count = data.len(); + let data_filled = fill_missing_points(data, policy, from, to, interval_limit)?; + if let Some(filled_count) = data_filled.len().checked_sub(retrieved_count) { + if filled_count > 0 { + tracing::debug!(policy = ?policy, "{} missing points were filled", filled_count); + } } - if let Some(to) = to { - let custom_where = Expr::cust_with_values::( - "date <= (SELECT COALESCE(MIN(date), '9999-12-31'::date) FROM chart_data WHERE chart_id = $1 AND date >= $2)", - [chart_id.into(), to.into()], - ); - QuerySelect::query(&mut data_request).cond_where(custom_where); - }; - - let data_with_extra = data_request.into_model().all(db).await?; - let data_filled = fill_missing_points(data_with_extra, policy, from, to); - let data = filter_within_range(data_filled, from, to); - Ok(data) + let filled_len = data_filled.len(); + let data_filtered = filter_within_range(data_filled, from, to); + if let Some(filtered) = filled_len.checked_sub(data_filtered.len()) { + if filtered > 0 { + tracing::debug!(range = ?(from, to), "{} points outside of range were removed", filtered); + } + } + Ok(data_filtered) } +/// Fills values for all dates from `min(data.first(), from)` to `max(data.last(), to)` according +/// to `policy`. +/// +/// See [`fill_zeros`] and [`fill_previous`] for details on the policies. pub fn fill_missing_points( data: Vec, policy: MissingDatePolicy, from: Option, to: Option, -) -> Vec { + interval_limit: Option, +) -> Result, ReadError> { let from = vec![from.as_ref(), data.first().map(|v| &v.date)] .into_iter() .flatten() @@ -54,16 +49,24 @@ pub fn fill_missing_points( .max(); let (from, to) = match (from, to) { (Some(from), Some(to)) if from <= to => (from.to_owned(), to.to_owned()), - _ => return data, + // data is empty or ill-formed + _ => return Ok(data), }; - match policy { - MissingDatePolicy::FillZero => fill_zeros(data, from, to), - MissingDatePolicy::FillPrevious => fill_previous(data, from, to), + if let Some(interval_limit) = interval_limit { + if to - from > interval_limit { + return Err(ReadError::IntervalLimitExceeded(interval_limit)); + } } + + Ok(match policy { + MissingDatePolicy::FillZero => filled_zeros_data(&data, from, to), + MissingDatePolicy::FillPrevious => filled_previous_data(&data, from, to), + }) } -fn fill_zeros(data: Vec, from: NaiveDate, to: NaiveDate) -> Vec { +/// Inserts zero values in `data` for all missing dates in inclusive range `[from; to]` +fn filled_zeros_data(data: &[DateValue], from: NaiveDate, to: NaiveDate) -> Vec { let n = (to - from).num_days() as usize; let mut new_data: Vec = Vec::with_capacity(n); @@ -86,7 +89,9 @@ fn fill_zeros(data: Vec, from: NaiveDate, to: NaiveDate) -> Vec, from: NaiveDate, to: NaiveDate) -> Vec { +/// Inserts last existing values in `data` for all missing dates in inclusive range `[from; to]`. +/// For all leading missing dates inserts zero. +fn filled_previous_data(data: &[DateValue], from: NaiveDate, to: NaiveDate) -> Vec { let n = (to - from).num_days() as usize; let mut new_data: Vec = Vec::with_capacity(n); let mut current_date = from; @@ -112,7 +117,7 @@ fn fill_previous(data: Vec, from: NaiveDate, to: NaiveDate) -> Vec, maybe_from: Option, maybe_to: Option, @@ -167,6 +172,24 @@ mod tests { Some(d("2022-01-01")), Some(d("2022-01-01")), ), + ( + vec![v("2022-01-01", "01"), v("2022-01-02", "02")], + vec![v("2022-01-01", "01"), v("2022-01-02", "02")], + Some(d("2022-01-01")), + Some(d("2022-01-02")), + ), + ( + vec![v("2022-01-01", "01")], + vec![v("2022-01-01", "01"), v("2022-01-02", "0")], + Some(d("2022-01-01")), + Some(d("2022-01-02")), + ), + ( + vec![v("2022-01-02", "02")], + vec![v("2022-01-01", "0"), v("2022-01-02", "02")], + Some(d("2022-01-01")), + Some(d("2022-01-02")), + ), ( vec![ v("2022-08-20", "20"), @@ -206,8 +229,26 @@ mod tests { Some(d("2023-07-12")), Some(d("2023-07-14")), ), + ( + vec![ + v("2023-07-10", "10"), + v("2023-07-12", "12"), + v("2023-07-15", "12"), + ], + vec![ + v("2023-07-10", "10"), + v("2023-07-11", "0"), + v("2023-07-12", "12"), + v("2023-07-13", "0"), + v("2023-07-14", "0"), + v("2023-07-15", "12"), + ], + Some(d("2023-07-12")), + Some(d("2023-07-13")), + ), ] { - let actual = fill_missing_points(data, MissingDatePolicy::FillZero, from, to); + let actual = + fill_missing_points(data, MissingDatePolicy::FillZero, from, to, None).unwrap(); assert_eq!(expected, actual) } } @@ -229,6 +270,24 @@ mod tests { Some(d("2022-01-01")), Some(d("2022-01-01")), ), + ( + vec![v("2022-01-01", "01"), v("2022-01-02", "02")], + vec![v("2022-01-01", "01"), v("2022-01-02", "02")], + Some(d("2022-01-01")), + Some(d("2022-01-02")), + ), + ( + vec![v("2022-01-01", "01")], + vec![v("2022-01-01", "01"), v("2022-01-02", "01")], + Some(d("2022-01-01")), + Some(d("2022-01-02")), + ), + ( + vec![v("2022-01-02", "02")], + vec![v("2022-01-01", "0"), v("2022-01-02", "02")], + Some(d("2022-01-01")), + Some(d("2022-01-02")), + ), ( vec![ v("2022-08-20", "20"), @@ -251,9 +310,111 @@ mod tests { Some(d("2022-08-18")), Some(d("2022-08-27")), ), + ( + vec![ + v("2023-07-10", "10"), + v("2023-07-12", "12"), + v("2023-07-15", "12"), + ], + vec![ + v("2023-07-10", "10"), + v("2023-07-11", "10"), + v("2023-07-12", "12"), + v("2023-07-13", "12"), + v("2023-07-14", "12"), + v("2023-07-15", "12"), + ], + Some(d("2023-07-12")), + Some(d("2023-07-14")), + ), + ( + vec![ + v("2023-07-10", "10"), + v("2023-07-12", "12"), + v("2023-07-15", "12"), + ], + vec![ + v("2023-07-10", "10"), + v("2023-07-11", "10"), + v("2023-07-12", "12"), + v("2023-07-13", "12"), + v("2023-07-14", "12"), + v("2023-07-15", "12"), + ], + Some(d("2023-07-12")), + Some(d("2023-07-13")), + ), ] { - let actual = fill_missing_points(data, MissingDatePolicy::FillPrevious, from, to); + let actual = + fill_missing_points(data, MissingDatePolicy::FillPrevious, from, to, None).unwrap(); assert_eq!(expected, actual); } } + + #[test] + fn limits_are_respected() { + let limit = Duration::days(4); + assert_eq!( + fill_missing_points( + vec![ + v("2023-07-10", "10"), + v("2023-07-12", "12"), + v("2023-07-15", "12"), + ], + MissingDatePolicy::FillZero, + Some(d("2023-07-12")), + Some(d("2023-07-12")), + Some(limit) + ), + Err(ReadError::IntervalLimitExceeded(limit)) + ); + assert_eq!( + fill_missing_points( + vec![ + v("2023-07-10", "10"), + v("2023-07-12", "12"), + v("2023-07-14", "12"), + ], + MissingDatePolicy::FillZero, + Some(d("2023-07-10")), + Some(d("2023-07-14")), + Some(limit) + ), + Ok(vec![ + v("2023-07-10", "10"), + v("2023-07-11", "0"), + v("2023-07-12", "12"), + v("2023-07-13", "0"), + v("2023-07-14", "12"), + ],) + ); + assert_eq!( + fill_missing_points( + vec![ + v("2023-07-10", "10"), + v("2023-07-12", "12"), + v("2023-07-14", "12"), + ], + MissingDatePolicy::FillZero, + Some(d("2023-07-10")), + Some(d("2023-07-15")), + Some(limit) + ), + Err(ReadError::IntervalLimitExceeded(limit)) + ); + assert_eq!( + fill_missing_points( + vec![ + v("2023-07-10", "10"), + v("2023-07-12", "12"), + v("2023-07-14", "12"), + ], + MissingDatePolicy::FillZero, + Some(d("2023-07-09")), + Some(d("2023-07-14")), + Some(limit) + ), + Err(ReadError::IntervalLimitExceeded(limit)) + ); + } } diff --git a/stats/stats/src/tests/mock_blockscout.rs b/stats/stats/src/tests/mock_blockscout.rs index b149f10cd..7c51df636 100644 --- a/stats/stats/src/tests/mock_blockscout.rs +++ b/stats/stats/src/tests/mock_blockscout.rs @@ -6,7 +6,7 @@ use chrono::{NaiveDate, NaiveDateTime}; use sea_orm::{prelude::Decimal, ActiveValue::NotSet, DatabaseConnection, EntityTrait, Set}; use std::str::FromStr; -pub async fn fill_mock_blockscout_data(blockscout: &DatabaseConnection, max_date: &str) { +pub async fn fill_mock_blockscout_data(blockscout: &DatabaseConnection, max_date: NaiveDate) { addresses::Entity::insert_many([ addresses::ActiveModel { hash: Set(vec![]), @@ -41,9 +41,7 @@ pub async fn fill_mock_blockscout_data(blockscout: &DatabaseConnection, max_date "2023-03-01T10:00:00", ] .into_iter() - .filter(|val| { - NaiveDateTime::from_str(val).unwrap().date() <= NaiveDate::from_str(max_date).unwrap() - }) + .filter(|val| NaiveDateTime::from_str(val).unwrap().date() <= max_date) .enumerate() .map(|(ind, ts)| mock_block(ind as i64, ts, true)) .collect::>(); @@ -199,9 +197,7 @@ pub async fn fill_mock_blockscout_data(blockscout: &DatabaseConnection, max_date "2022-11-08T12:00:00", ] .into_iter() - .filter(|val| { - NaiveDateTime::from_str(val).unwrap().date() <= NaiveDate::from_str(max_date).unwrap() - }) + .filter(|val| NaiveDateTime::from_str(val).unwrap().date() <= max_date) .enumerate() .map(|(ind, ts)| mock_block((ind + blocks.len()) as i64, ts, false)); blocks::Entity::insert_many(useless_blocks) diff --git a/stats/stats/src/tests/simple_test.rs b/stats/stats/src/tests/simple_test.rs index df8f2075b..2978f4035 100644 --- a/stats/stats/src/tests/simple_test.rs +++ b/stats/stats/src/tests/simple_test.rs @@ -1,40 +1,101 @@ use super::{init_db::init_db_all, mock_blockscout::fill_mock_blockscout_data}; -use crate::{get_chart_data, get_counters, Chart, MissingDatePolicy}; -use chrono::NaiveDate; +use crate::{ + charts::db_interaction::chart_updaters::ChartUpdater, get_chart_data, get_counters, Chart, + MissingDatePolicy, +}; +use chrono::{DateTime, NaiveDate}; use sea_orm::DatabaseConnection; -use std::assert_eq; +use std::{assert_eq, str::FromStr}; -pub async fn simple_test_chart(test_name: &str, chart: impl Chart, expected: Vec<(&str, &str)>) { +pub async fn simple_test_chart( + test_name: &str, + chart: impl ChartUpdater, + expected: Vec<(&str, &str)>, +) { let _ = tracing_subscriber::fmt::try_init(); let (db, blockscout) = init_db_all(test_name).await; + let current_time = DateTime::from_str("2023-03-01T12:00:00Z").unwrap(); + let current_date = current_time.date_naive(); chart.create(&db).await.unwrap(); - fill_mock_blockscout_data(&blockscout, "2023-03-01").await; + fill_mock_blockscout_data(&blockscout, current_date).await; + let approximate_trailing_points = chart.approximate_trailing_points(); - chart.update(&db, &blockscout, true).await.unwrap(); - get_chart_and_assert_eq(&db, &chart, &expected, None, None, None).await; + chart + .update(&db, &blockscout, current_time, true) + .await + .unwrap(); + get_chart_and_assert_eq( + &db, + &chart, + &expected, + None, + None, + None, + approximate_trailing_points, + ) + .await; - chart.update(&db, &blockscout, false).await.unwrap(); - get_chart_and_assert_eq(&db, &chart, &expected, None, None, None).await; + chart + .update(&db, &blockscout, current_time, false) + .await + .unwrap(); + get_chart_and_assert_eq( + &db, + &chart, + &expected, + None, + None, + None, + approximate_trailing_points, + ) + .await; } pub async fn ranged_test_chart( test_name: &str, - chart: impl Chart, + chart: impl ChartUpdater, expected: Vec<(&str, &str)>, from: NaiveDate, to: NaiveDate, ) { let _ = tracing_subscriber::fmt::try_init(); let (db, blockscout) = init_db_all(test_name).await; + let current_time = DateTime::from_str("2023-03-01T12:00:00Z").unwrap(); + let current_date = current_time.date_naive(); chart.create(&db).await.unwrap(); - fill_mock_blockscout_data(&blockscout, "2023-03-01").await; + fill_mock_blockscout_data(&blockscout, current_date).await; let policy = chart.missing_date_policy(); + let approximate_trailing_points = chart.approximate_trailing_points(); - chart.update(&db, &blockscout, true).await.unwrap(); - get_chart_and_assert_eq(&db, &chart, &expected, Some(from), Some(to), Some(policy)).await; + chart + .update(&db, &blockscout, current_time, true) + .await + .unwrap(); + get_chart_and_assert_eq( + &db, + &chart, + &expected, + Some(from), + Some(to), + Some(policy), + approximate_trailing_points, + ) + .await; - chart.update(&db, &blockscout, false).await.unwrap(); - get_chart_and_assert_eq(&db, &chart, &expected, Some(from), Some(to), Some(policy)).await; + chart + .update(&db, &blockscout, current_time, false) + .await + .unwrap(); + get_chart_and_assert_eq( + &db, + &chart, + &expected, + Some(from), + Some(to), + Some(policy), + approximate_trailing_points, + ) + .await; } async fn get_chart_and_assert_eq( @@ -44,10 +105,19 @@ async fn get_chart_and_assert_eq( from: Option, to: Option, policy: Option, + approximate_trailing_points: u64, ) { - let data = get_chart_data(db, chart.name(), from, to, policy) - .await - .unwrap(); + let data = get_chart_data( + db, + chart.name(), + from, + to, + None, + policy, + approximate_trailing_points, + ) + .await + .unwrap(); let data: Vec<_> = data .into_iter() .map(|p| (p.date.to_string(), p.value)) @@ -59,16 +129,25 @@ async fn get_chart_and_assert_eq( assert_eq!(expected, &data); } -pub async fn simple_test_counter(test_name: &str, counter: impl Chart, expected: &str) { +pub async fn simple_test_counter(test_name: &str, counter: impl ChartUpdater, expected: &str) { let _ = tracing_subscriber::fmt::try_init(); let (db, blockscout) = init_db_all(test_name).await; + let current_time = chrono::DateTime::from_str("2023-03-01T12:00:00Z").unwrap(); + let current_date = current_time.date_naive(); + counter.create(&db).await.unwrap(); - fill_mock_blockscout_data(&blockscout, "2023-03-01").await; + fill_mock_blockscout_data(&blockscout, current_date).await; - counter.update(&db, &blockscout, true).await.unwrap(); + counter + .update(&db, &blockscout, current_time, true) + .await + .unwrap(); get_counter_and_assert_eq(&db, &counter, expected).await; - counter.update(&db, &blockscout, false).await.unwrap(); + counter + .update(&db, &blockscout, current_time, false) + .await + .unwrap(); get_counter_and_assert_eq(&db, &counter, expected).await; }