Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stats): Backend charts & Migration: total_txns and total_addresses #1144

Merged
merged 9 commits into from
Dec 11, 2024
3 changes: 2 additions & 1 deletion stats/config/update_groups.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
"active_accounts_group": "0 0 4 * * * *",
"average_block_time_group": "0 */10 * * * * *",
"completed_txns_group": "0 5 */3 * * * *",
"total_addresses_group": "0 0 */3 * * * *",
"total_addresses_group": "0 0,30 * * * * *",
"total_blocks_group": "0 0 */2 * * * *",
"total_tokens_group": "0 0 18 * * * *",
"total_txns_group": "0 5 */2 * * * *",
"yesterday_txns_group": "0 8 0 * * * *",
"active_recurring_accounts_daily_recurrence_60_days_group": "0 0 2 * * * *",
"active_recurring_accounts_daily_recurrence_90_days_group": "0 20 2 * * * *",
Expand Down
1 change: 1 addition & 0 deletions stats/stats-server/src/runtime_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ impl RuntimeSetup {
Arc::new(TotalAddressesGroup),
Arc::new(TotalBlocksGroup),
Arc::new(TotalTokensGroup),
Arc::new(TotalTxnsGroup),
bragov4ik marked this conversation as resolved.
Show resolved Hide resolved
Arc::new(YesterdayTxnsGroup),
Arc::new(ActiveRecurringAccountsDailyRecurrence60DaysGroup),
Arc::new(ActiveRecurringAccountsMonthlyRecurrence60DaysGroup),
Expand Down
12 changes: 8 additions & 4 deletions stats/stats-server/tests/it/indexing_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use blockscout_service_launcher::{
launcher::ConfigSettings,
test_server::{get_test_server_settings, init_server, send_get_request},
};
use itertools::Itertools;
use pretty_assertions::assert_eq;

use stats::tests::{init_db::init_db_all, mock_blockscout::mock_blockscout_api};
Expand Down Expand Up @@ -85,8 +86,11 @@ async fn test_not_indexed_ok() {
}
}

let mut counters: Counters = send_get_request(&base, "/api/v1/counters").await;
// totalBlocks has fallback with estimate, so it should always return
assert_eq!(counters.counters.pop().unwrap().id, "totalBlocks");
assert_eq!(counters.counters, vec![])
let counters: Counters = send_get_request(&base, "/api/v1/counters").await;
// returns onle counters with fallback query logic,
// so they are returned even without calling an update
assert_eq!(
counters.counters.into_iter().map(|c| c.id).collect_vec(),
vec!["totalAddresses", "totalBlocks", "totalTxns"]
)
}
48 changes: 42 additions & 6 deletions stats/stats/src/charts/counters/total_addresses.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
use crate::{
charts::db_interaction::read::query_estimated_table_rows,
data_source::{
kinds::{
local_db::DirectPointLocalDbChartSource,
local_db::{parameters::ValueEstimation, DirectPointLocalDbChartSourceWithEstimate},
remote_db::{PullOne, RemoteDatabaseSource, StatementForOne},
},
types::BlockscoutMigrations,
},
ChartProperties, MissingDatePolicy, Named,
types::timespans::DateValue,
utils::MarkedDbConnection,
ChartError, ChartProperties, MissingDatePolicy, Named,
};
use chrono::NaiveDate;
use blockscout_db::entity::addresses;
use chrono::{NaiveDate, Utc};
use entity::sea_orm_active_enums::ChartType;
use sea_orm::{DbBackend, Statement};
use sea_orm::{DbBackend, EntityName, Statement};

pub struct TotalAddressesStatement;

Expand Down Expand Up @@ -57,16 +61,48 @@ impl ChartProperties for Properties {
}
}

pub type TotalAddresses = DirectPointLocalDbChartSource<TotalAddressesRemote, Properties>;
pub struct TotalAddressesEstimation;

impl ValueEstimation for TotalAddressesEstimation {
async fn estimate(blockscout: &MarkedDbConnection) -> Result<DateValue<String>, ChartError> {
// `now()` is more relevant when taken right before the query rather than
// `cx.time` measured a bit earlier.
let now = Utc::now();
let value = query_estimated_table_rows(
blockscout.connection.as_ref(),
addresses::Entity.table_name(),
)
.await
.map_err(ChartError::BlockscoutDB)?
.map(|n| u64::try_from(n).unwrap_or(0))
.unwrap_or(0);
Ok(DateValue {
timespan: now.date_naive(),
value: value.to_string(),
})
}
}

pub type TotalAddresses = DirectPointLocalDbChartSourceWithEstimate<
TotalAddressesRemote,
TotalAddressesEstimation,
Properties,
>;

#[cfg(test)]
mod tests {
use super::*;
use crate::tests::simple_test::simple_test_counter;
use crate::tests::simple_test::{simple_test_counter, test_counter_fallback};

#[tokio::test]
#[ignore = "needs database to run"]
async fn update_total_addresses() {
simple_test_counter::<TotalAddresses>("update_total_addresses", "33", None).await;
}

#[tokio::test]
#[ignore = "needs database to run"]
async fn total_addresses_fallback() {
test_counter_fallback::<TotalAddresses>("total_addresses_fallback").await;
}
}
41 changes: 8 additions & 33 deletions stats/stats/src/charts/counters/total_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ pub struct TotalBlocksEstimation;

impl ValueEstimation for TotalBlocksEstimation {
async fn estimate(blockscout: &MarkedDbConnection) -> Result<DateValue<String>, ChartError> {
// `now()` is more relevant when taken right before the query rather than
// `cx.time` measured a bit earlier.
let now = Utc::now();
let value =
query_estimated_table_rows(blockscout.connection.as_ref(), blocks::Entity.table_name())
Expand All @@ -103,14 +105,15 @@ mod tests {
use crate::{
data_source::{types::BlockscoutMigrations, DataSource, UpdateContext, UpdateParameters},
tests::{
init_db::init_marked_db_all, mock_blockscout::fill_mock_blockscout_data,
simple_test::get_counter,
init_db::init_marked_db_all,
mock_blockscout::fill_mock_blockscout_data,
simple_test::{get_counter, test_counter_fallback},
},
};
use chrono::NaiveDate;
use entity::chart_data;
use pretty_assertions::{assert_eq, assert_ne};
use sea_orm::{DatabaseConnection, DbBackend, EntityTrait, Set, Statement};
use pretty_assertions::assert_eq;
use sea_orm::{DatabaseConnection, EntityTrait, Set};
use std::str::FromStr;

#[tokio::test]
Expand Down Expand Up @@ -217,34 +220,6 @@ mod tests {
#[tokio::test]
#[ignore = "needs database to run"]
async fn total_blocks_fallback() {
let _ = tracing_subscriber::fmt::try_init();
let (db, blockscout) = init_marked_db_all("total_blocks_fallback").await;
let current_time = chrono::DateTime::from_str("2023-03-01T12:00:00Z").unwrap();
let current_date = current_time.date_naive();

TotalBlocks::init_recursively(&db.connection, &current_time)
.await
.unwrap();

fill_mock_blockscout_data(&blockscout.connection, current_date).await;

// need to analyze or vacuum for `reltuples` to be updated.
// source: https://www.postgresql.org/docs/9.3/planner-stats.html
let _ = blockscout
.connection
.execute(Statement::from_string(DbBackend::Postgres, "ANALYZE;"))
.await
.unwrap();

let parameters = UpdateParameters {
db: &db,
blockscout: &blockscout,
blockscout_applied_migrations: BlockscoutMigrations::latest(),
update_time_override: Some(current_time),
force_full: false,
};
let cx: UpdateContext<'_> = UpdateContext::from_params_now_or_override(parameters.clone());
let data = get_counter::<TotalBlocks>(&cx).await;
assert_ne!("0", data.value);
test_counter_fallback::<TotalBlocks>("total_blocks_fallback").await;
}
}
100 changes: 91 additions & 9 deletions stats/stats/src/charts/counters/total_txns.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,60 @@
use crate::{
data_source::kinds::{
data_manipulation::{map::MapToString, sum_point::Sum},
local_db::DirectPointLocalDbChartSource,
charts::db_interaction::read::query_estimated_table_rows,
data_source::{
kinds::{
local_db::{parameters::ValueEstimation, DirectPointLocalDbChartSourceWithEstimate},
remote_db::{RemoteDatabaseSource, RemoteQueryBehaviour},
},
UpdateContext,
},
lines::NewTxnsInt,
ChartProperties, MissingDatePolicy, Named,
range::UniversalRange,
types::timespans::DateValue,
utils::MarkedDbConnection,
ChartError, ChartProperties, MissingDatePolicy, Named,
};

use chrono::NaiveDate;
use blockscout_db::entity::{blocks, transactions};
use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
use entity::sea_orm_active_enums::ChartType;
use sea_orm::{
prelude::Expr, ColumnTrait, EntityName, EntityTrait, PaginatorTrait, QueryFilter, QuerySelect,
};

pub struct TotalTxnsQueryBehaviour;

impl RemoteQueryBehaviour for TotalTxnsQueryBehaviour {
type Output = DateValue<String>;

async fn query_data(
cx: &UpdateContext<'_>,
_range: UniversalRange<DateTime<Utc>>,
) -> Result<Self::Output, ChartError> {
let blockscout = cx.blockscout.connection.as_ref();
let timespan: NaiveDateTime = blocks::Entity::find()
.select_only()
.column_as(Expr::col(blocks::Column::Timestamp).max(), "timestamp")
.filter(blocks::Column::Consensus.eq(true))
.into_tuple()
.one(blockscout)
.await
.map_err(ChartError::BlockscoutDB)?
.ok_or_else(|| ChartError::Internal("no block timestamps in database".into()))?;

let value = transactions::Entity::find()
.select_only()
.count(blockscout)
.await
.map_err(ChartError::BlockscoutDB)?;

let data = DateValue::<String> {
timespan: timespan.date(),
value: value.to_string(),
};
Ok(data)
}
bragov4ik marked this conversation as resolved.
Show resolved Hide resolved
}

pub type TotalTxnsRemote = RemoteDatabaseSource<TotalTxnsQueryBehaviour>;

pub struct Properties;

Expand All @@ -29,16 +75,52 @@ impl ChartProperties for Properties {
}
}

pub type TotalTxns = DirectPointLocalDbChartSource<MapToString<Sum<NewTxnsInt>>, Properties>;
pub struct TotalTxnsEstimation;

impl ValueEstimation for TotalTxnsEstimation {
async fn estimate(blockscout: &MarkedDbConnection) -> Result<DateValue<String>, ChartError> {
// `now()` is more relevant when taken right before the query rather than
// `cx.time` measured a bit earlier.
let now = Utc::now();
let value = query_estimated_table_rows(
blockscout.connection.as_ref(),
transactions::Entity.table_name(),
)
.await
.map_err(ChartError::BlockscoutDB)?
.map(|n| u64::try_from(n).unwrap_or(0))
.unwrap_or(0);
sevenzing marked this conversation as resolved.
Show resolved Hide resolved
Ok(DateValue {
timespan: now.date_naive(),
value: value.to_string(),
bragov4ik marked this conversation as resolved.
Show resolved Hide resolved
})
}
}

// We will need it to update on not fully indexed data soon, therefore this counter is
// separated from `NewTxns`.
//
// Separate query not reliant on previous computation helps this counter to work in such
// environments.
//
// todo: make it dependant again if #845 is resolved
pub type TotalTxns =
DirectPointLocalDbChartSourceWithEstimate<TotalTxnsRemote, TotalTxnsEstimation, Properties>;

#[cfg(test)]
mod tests {
use super::*;
use crate::tests::simple_test::simple_test_counter;
use crate::tests::simple_test::{simple_test_counter, test_counter_fallback};

#[tokio::test]
#[ignore = "needs database to run"]
async fn update_total_txns() {
simple_test_counter::<TotalTxns>("update_total_txns", "47", None).await;
simple_test_counter::<TotalTxns>("update_total_txns", "48", None).await;
}

#[tokio::test]
#[ignore = "needs database to run"]
async fn total_txns_fallback() {
test_counter_fallback::<TotalTxns>("total_txns_fallback").await;
}
}
5 changes: 3 additions & 2 deletions stats/stats/src/charts/lines/new_txns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ impl StatementFromRange for NewTxnsStatement {
range: Option<Range<DateTime<Utc>>>,
completed_migrations: &BlockscoutMigrations,
) -> Statement {
// do not filter by `!= to_timestamp(0)` because
// 1. it allows to use index `transactions_block_consensus_index`
// 2. there is no reason not to count genesis transactions
if completed_migrations.denormalization {
sql_with_range_filter_opt!(
DbBackend::Postgres,
Expand All @@ -44,7 +47,6 @@ impl StatementFromRange for NewTxnsStatement {
COUNT(*)::TEXT as value
FROM transactions t
WHERE
t.block_timestamp != to_timestamp(0) AND
t.block_consensus = true {filter}
GROUP BY date;
"#,
Expand All @@ -62,7 +64,6 @@ impl StatementFromRange for NewTxnsStatement {
FROM transactions t
JOIN blocks b ON t.block_hash = b.hash
WHERE
b.timestamp != to_timestamp(0) AND
b.consensus = true {filter}
GROUP BY date;
"#,
Expand Down
39 changes: 39 additions & 0 deletions stats/stats/src/tests/simple_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
use blockscout_service_launcher::test_database::TestDbGuard;
use chrono::{DateTime, NaiveDateTime, Utc};
use pretty_assertions::assert_eq;
use sea_orm::{ConnectionTrait, DbBackend, Statement};
use stats_proto::blockscout::stats::v1::Point;
use std::{fmt::Debug, str::FromStr};

Expand Down Expand Up @@ -330,6 +331,44 @@ async fn simple_test_counter_inner<C>(
assert_eq!(expected, get_counter::<C>(&cx).await.value);
}

/// Test that the counter returns non-zero fallback value when both
/// - Blockscout data is populated
/// - Update is not called on the counter
pub async fn test_counter_fallback<C>(test_name: &str)
where
C: DataSource + ChartProperties + QuerySerialized<Output = DateValue<String>>,
{
let _ = tracing_subscriber::fmt::try_init();
let (db, blockscout) = init_marked_db_all(test_name).await;
let current_time = chrono::DateTime::from_str("2023-03-01T12:00:00Z").unwrap();
let current_date = current_time.date_naive();

C::init_recursively(&db.connection, &current_time)
.await
.unwrap();

fill_mock_blockscout_data(&blockscout.connection, current_date).await;

// need to analyze or vacuum for `reltuples` to be updated.
// source: https://www.postgresql.org/docs/9.3/planner-stats.html
let _ = blockscout
.connection
.execute(Statement::from_string(DbBackend::Postgres, "ANALYZE;"))
.await
.unwrap();

let parameters = UpdateParameters {
db: &db,
blockscout: &blockscout,
blockscout_applied_migrations: BlockscoutMigrations::latest(),
update_time_override: Some(current_time),
force_full: false,
};
let cx: UpdateContext<'_> = UpdateContext::from_params_now_or_override(parameters.clone());
let data = get_counter::<C>(&cx).await;
assert_ne!("0", data.value);
}

pub async fn prepare_chart_test<C: DataSource + ChartProperties>(
test_name: &str,
init_time: Option<NaiveDateTime>,
Expand Down
Loading
Loading