From 119090cf683a7e3eb0ab1abdb868fd4add9f2121 Mon Sep 17 00:00:00 2001 From: Kirill Ivanov <8144358+bragov4ik@users.noreply.github.com> Date: Wed, 13 Nov 2024 10:03:40 +0300 Subject: [PATCH] feat(stats): The Migration of The Backend charts (Part 1: `average_block_time`) (#1117) The first installment in the Migrate Backend Charts franchise, it stars [Commit 1](1692f3f365e4a6253d4ab7319a01dbfee7aaf071), [Commit 2](d667528bdbcaa29f1d4011d4b0cb46fa2c411663), [Commit 3](3269838c680fd6dbba072a3ff668f1281552b470), [Commit 4](f531523ff0949a59419148d3ded32ee254cdea4f), and [Commit 5](683c6383f835044c5eafe673cd0acbc9b226b815) in the main cast. In the PR, `average_block_time` chart is adjusted according to implementation in Blockscout backend. Additionally, added a fallback for cases when not enough blocks are indexed yet. Even though the info may be inaccurate in this case, it's better than nothing (+ it will refresh pretty often so accuracy will be achieved quickly) --- stats/docker-compose.dev.yml | 1 + stats/stats-server/tests/it/counters.rs | 1 + .../src/charts/counters/average_block_time.rs | 214 +++++++++++++++--- stats/stats/src/charts/db_interaction/read.rs | 2 +- stats/stats/src/tests/mock_blockscout.rs | 43 +++- stats/stats/src/tests/simple_test.rs | 28 ++- stats/stats/src/utils.rs | 3 + 7 files changed, 241 insertions(+), 51 deletions(-) diff --git a/stats/docker-compose.dev.yml b/stats/docker-compose.dev.yml index a8ddb44c1..eee03d893 100644 --- a/stats/docker-compose.dev.yml +++ b/stats/docker-compose.dev.yml @@ -115,6 +115,7 @@ services: - STATS__SERVER__HTTP__ENABLED=true - STATS__SERVER__HTTP__ADDR=0.0.0.0:8050 - STATS__SERVER__HTTP__MAX_BODY_SIZE=2097152 + - STATS__BLOCKSCOUT_API_URL=http://backend:4000 # - STATS__FORCE_UPDATE_ON_START=true - RUST_BACKTRACE=1 ports: diff --git a/stats/stats-server/tests/it/counters.rs b/stats/stats-server/tests/it/counters.rs index 55a928ba0..335739782 100644 --- a/stats/stats-server/tests/it/counters.rs +++ b/stats/stats-server/tests/it/counters.rs @@ -3,6 +3,7 @@ use blockscout_service_launcher::{ test_server::{get_test_server_settings, init_server, send_get_request}, }; use chrono::NaiveDate; +use pretty_assertions::assert_eq; use stats::tests::{ init_db::init_db_all, diff --git a/stats/stats/src/charts/counters/average_block_time.rs b/stats/stats/src/charts/counters/average_block_time.rs index c79feecc0..0e4fb595f 100644 --- a/stats/stats/src/charts/counters/average_block_time.rs +++ b/stats/stats/src/charts/counters/average_block_time.rs @@ -1,48 +1,117 @@ +use std::{cmp::Reverse, ops::Range}; + use crate::{ data_source::{ kinds::{ data_manipulation::map::MapToString, local_db::DirectPointLocalDbChartSource, - remote_db::{PullOne, RemoteDatabaseSource, StatementForOne}, + remote_db::{RemoteDatabaseSource, RemoteQueryBehaviour}, }, - types::BlockscoutMigrations, + UpdateContext, }, - ChartProperties, MissingDatePolicy, Named, + types::TimespanValue, + utils::NANOS_PER_SEC, + ChartProperties, MissingDatePolicy, Named, UpdateError, }; +use blockscout_db::entity::blocks; use chrono::NaiveDate; use entity::sea_orm_active_enums::ChartType; -use sea_orm::{DbBackend, Statement}; - -pub struct AverageBlockTimeStatement; - -impl StatementForOne for AverageBlockTimeStatement { - fn get_statement(_: &BlockscoutMigrations) -> Statement { - Statement::from_sql_and_values( - DbBackend::Postgres, - r#" - SELECT - max(timestamp)::date as date, - (CASE WHEN avg(diff) IS NULL THEN 0 ELSE avg(diff) END)::float as value - FROM - ( - SELECT - timestamp, - EXTRACT( - EPOCH FROM timestamp - lag(timestamp) OVER (ORDER BY timestamp) - ) as diff - FROM blocks b - WHERE b.timestamp != to_timestamp(0) AND consensus = true - ) t - "#, - vec![], - ) +use itertools::Itertools; +use sea_orm::{prelude::*, DbBackend, FromQueryResult, QueryOrder, QuerySelect, Statement}; + +pub const LIMIT_BLOCKS: u64 = 100; +pub const OFFSET_BLOCKS: u64 = 100; + +fn average_block_time_statement(offset: u64) -> Statement { + blocks::Entity::find() + .select_only() + .column_as(Expr::col(blocks::Column::Timestamp), "timestamp") + // Do not count genesis block because it results in weird block time. + // We assume that genesis block number is 1 or 0 (just to be safe). + // If it's not, weird block time will be present only for + // `LIMIT_BLOCKS` blocks, so it's not a big deal. + .filter(blocks::Column::Number.gt(1)) + .limit(LIMIT_BLOCKS) + // top state is considered quite unstable which is not great for computing + // the metric + .offset(offset) + .order_by_desc(blocks::Column::Number) + // Not configurable because `false` seems to be completely unused + .filter(blocks::Column::Consensus.eq(true)) + .into_model::() + .into_statement(DbBackend::Postgres) +} + +#[derive(FromQueryResult, Debug)] +struct BlockTimestamp { + timestamp: chrono::NaiveDateTime, +} + +async fn query_average_block_time( + cx: &UpdateContext<'_>, + offset: u64, +) -> Result>, UpdateError> { + let query = average_block_time_statement(offset); + let block_timestamps = BlockTimestamp::find_by_statement(query) + .all(cx.blockscout) + .await + .map_err(UpdateError::BlockscoutDB)?; + Ok(calculate_average_block_time(block_timestamps)) +} + +pub struct AverageBlockTimeQuery; + +impl RemoteQueryBehaviour for AverageBlockTimeQuery { + type Output = TimespanValue; + + async fn query_data( + cx: &UpdateContext<'_>, + _range: Option>, + ) -> Result, UpdateError> { + match query_average_block_time(cx, OFFSET_BLOCKS).await? { + Some(avg_block_time) => Ok(avg_block_time), + None => query_average_block_time(cx, 0) + .await? + .ok_or(UpdateError::Internal( + "No blocks were returned to calculate average block time".into(), + )), + } } } -pub type AverageBlockTimeRemote = - RemoteDatabaseSource>; +// Time in seconds. `None` if the vector is empty. +fn calculate_average_block_time( + mut timestamps: Vec, +) -> Option> { + // data is expected to be already sorted; in this case + // the time complexity is linear + timestamps.sort_unstable_by_key(|x| Reverse(x.timestamp)); + let last_block_date = timestamps.first()?.timestamp.date(); + // ensure it's sorted somehow + let block_times_s = timestamps + .iter() + .tuple_windows::<(_, _)>() + .map(|(cur, prev)| { + let time_diff = cur.timestamp - prev.timestamp; + // formula from `subsec_nanos()` docs + let diff_ns = + time_diff.subsec_nanos() as i64 + time_diff.num_seconds() * NANOS_PER_SEC as i64; + diff_ns as f64 / NANOS_PER_SEC as f64 + }) + .collect_vec(); + let count = block_times_s.len(); + if count == 0 { + return None; + } + let average_block_time_seconds = block_times_s.iter().sum::() / count as f64; + Some(TimespanValue { + timespan: last_block_date, + value: average_block_time_seconds, + }) +} +pub type AverageBlockTimeRemote = RemoteDatabaseSource; pub type AverageBlockTimeRemoteString = MapToString; pub struct Properties; @@ -68,15 +137,94 @@ pub type AverageBlockTime = DirectPointLocalDbChartSource("update_average_block_time", None).await; + + let times_generator = [100u64, 200, 300]; + let block_times = repeat(1) + // genesis is not counted + // (2 because we consider block 1 as genesis just in case) + .take(2) + .chain( + times_generator + .into_iter() + .cycle() + // -1 since for `N` blocks there are `N - 1` time deltas + .take((LIMIT_BLOCKS - 1) as usize), + ) + // will be skipped + .chain(repeat(1).take(OFFSET_BLOCKS as usize)) + .map(|x| TimeDelta::seconds(x as i64)) + .collect_vec(); + let expected_avg = { + let limit_block_times = LIMIT_BLOCKS - 1; + let generator_len = u64::try_from(times_generator.len()).unwrap(); + // how many times the full `times_generator` sequence is repeated within considered blocks + let full_generator_repeats = limit_block_times / generator_len; + let full_repeats_sum = times_generator.iter().sum::() * full_generator_repeats; + // how many elements of `times_generator` are taken for the last repeat + let partial_repeat_elements_taken = limit_block_times % generator_len; + let partial_repeat_sum = times_generator + .iter() + .take(partial_repeat_elements_taken as usize) + .sum::(); + assert_eq!( + partial_repeat_elements_taken + full_generator_repeats * generator_len, + limit_block_times + ); + let total_sum = full_repeats_sum + partial_repeat_sum; + total_sum as f64 / limit_block_times as f64 + }; + fill_many_blocks(&blockscout, current_time.naive_utc(), &block_times).await; + let mut parameters = UpdateParameters { + db: &db, + blockscout: &blockscout, + blockscout_applied_migrations: BlockscoutMigrations::latest(), + update_time_override: Some(current_time), + force_full: true, + }; + let cx = UpdateContext::from_params_now_or_override(parameters.clone()); + AverageBlockTime::update_recursively(&cx).await.unwrap(); + assert_eq!( + expected_avg.to_string(), + get_counter::(&db).await + ); + parameters.force_full = false; + let cx = UpdateContext::from_params_now_or_override(parameters.clone()); + AverageBlockTime::update_recursively(&cx).await.unwrap(); + assert_eq!( + expected_avg.to_string(), + get_counter::(&db).await + ); + } + + #[tokio::test] + #[ignore = "needs database to run"] + async fn update_average_block_time_fallback() { + // if there are not enough blocks to use offset, calculate from available data simple_test_counter::( - "update_average_block_time", - "802200.0833333334", + "update_average_block_time_fallback", + // first 2 blocks are excluded + "958320", None, ) .await; diff --git a/stats/stats/src/charts/db_interaction/read.rs b/stats/stats/src/charts/db_interaction/read.rs index 9167f83c3..ef490c2cb 100644 --- a/stats/stats/src/charts/db_interaction/read.rs +++ b/stats/stats/src/charts/db_interaction/read.rs @@ -1067,7 +1067,7 @@ mod tests { async fn get_new_chart_data_returns_nothing() { let _ = tracing_subscriber::fmt::try_init(); - let db = init_db("get_chart_data_skipped_works").await; + let db = init_db("get_new_chart_data_returns_nothing").await; insert_mock_data(&db).await; let data = get_line_chart_data::( &db, diff --git a/stats/stats/src/tests/mock_blockscout.rs b/stats/stats/src/tests/mock_blockscout.rs index 0c9c422a3..1803225ad 100644 --- a/stats/stats/src/tests/mock_blockscout.rs +++ b/stats/stats/src/tests/mock_blockscout.rs @@ -4,7 +4,7 @@ use blockscout_db::entity::{ address_coin_balances_daily, addresses, block_rewards, blocks, internal_transactions, migrations_status, smart_contracts, tokens, transactions, }; -use chrono::{NaiveDate, NaiveDateTime}; +use chrono::{NaiveDate, NaiveDateTime, TimeDelta}; use rand::{Rng, SeedableRng}; use sea_orm::{prelude::Decimal, ActiveValue::NotSet, DatabaseConnection, EntityTrait, Set}; use std::str::FromStr; @@ -72,7 +72,7 @@ pub async fn fill_mock_blockscout_data(blockscout: &DatabaseConnection, max_date .into_iter() .filter(|val| NaiveDateTime::from_str(val).unwrap().date() <= max_date) .enumerate() - .map(|(ind, ts)| mock_block(ind as i64, ts, true)) + .map(|(ind, ts)| mock_block(ind as i64, NaiveDateTime::from_str(ts).unwrap(), true)) .collect::>(); blocks::Entity::insert_many(blocks.clone()) .exec(blockscout) @@ -228,7 +228,13 @@ pub async fn fill_mock_blockscout_data(blockscout: &DatabaseConnection, max_date .into_iter() .filter(|val| NaiveDateTime::from_str(val).unwrap().date() <= max_date) .enumerate() - .map(|(ind, ts)| mock_block((ind + blocks.len()) as i64, ts, false)); + .map(|(ind, ts)| { + mock_block( + (ind + blocks.len()) as i64, + NaiveDateTime::from_str(ts).unwrap(), + false, + ) + }); blocks::Entity::insert_many(useless_blocks) .exec(blockscout) .await @@ -303,13 +309,38 @@ pub async fn fill_mock_blockscout_data(blockscout: &DatabaseConnection, max_date .unwrap(); } -fn mock_block(index: i64, ts: &str, consensus: bool) -> blocks::ActiveModel { - let size = 1000 + (index as i32 * 15485863) % 5000; +/// `block_times` - block time for each block from the 2nd to the latest. +/// +/// ` = + 1` +pub async fn fill_many_blocks( + blockscout: &DatabaseConnection, + latest_block_time: NaiveDateTime, + block_times: &[TimeDelta], +) { + let mut blocks_timestamps_reversed = Vec::with_capacity(block_times.len() + 1); + blocks_timestamps_reversed.push(latest_block_time); + for time_diff in block_times.iter().rev() { + let next_timestamp = *blocks_timestamps_reversed.last().unwrap() - *time_diff; + blocks_timestamps_reversed.push(next_timestamp); + } + let blocks_timestamps = blocks_timestamps_reversed.into_iter().rev(); + let blocks = blocks_timestamps + .enumerate() + .map(|(ind, ts)| mock_block(ind as i64, ts, true)) + .collect::>(); + blocks::Entity::insert_many(blocks.clone()) + .exec(blockscout) + .await + .unwrap(); +} + +fn mock_block(index: i64, ts: NaiveDateTime, consensus: bool) -> blocks::ActiveModel { + let size = (1000 + (index * 15485863) % 5000) as i32; let gas_limit = if index <= 3 { 12_500_000 } else { 30_000_000 }; blocks::ActiveModel { number: Set(index), hash: Set(index.to_le_bytes().to_vec()), - timestamp: Set(NaiveDateTime::from_str(ts).unwrap()), + timestamp: Set(ts), consensus: Set(consensus), gas_limit: Set(Decimal::new(gas_limit, 0)), gas_used: Set(Decimal::from(size * 10)), diff --git a/stats/stats/src/tests/simple_test.rs b/stats/stats/src/tests/simple_test.rs index db37e8d98..cdd4a0fd0 100644 --- a/stats/stats/src/tests/simple_test.rs +++ b/stats/stats/src/tests/simple_test.rs @@ -68,14 +68,11 @@ where C: DataSource + ChartProperties, C::Resolution: Ord + Clone + Debug, { - let _ = tracing_subscriber::fmt::try_init(); + let (current_time, db, blockscout) = prepare_chart_test::(test_name, None).await; let expected = map_str_tuple_to_owned(expected); - let (db, blockscout) = init_db_all(test_name).await; - let current_time = DateTime::from_str("2023-03-01T12:00:00Z").unwrap(); + let approximate_trailing_points = C::approximate_trailing_points(); let current_date = current_time.date_naive(); - C::init_recursively(&db, ¤t_time).await.unwrap(); fill_mock_blockscout_data(&blockscout, current_date).await; - let approximate_trailing_points = C::approximate_trailing_points(); let mut parameters = UpdateParameters { db: &db, @@ -347,13 +344,9 @@ async fn simple_test_counter_inner( update_time: Option, migrations: BlockscoutMigrations, ) { - let _ = tracing_subscriber::fmt::try_init(); - let (db, blockscout) = init_db_all(test_name).await; + let (current_time, db, blockscout) = prepare_chart_test::(test_name, update_time).await; let max_time = DateTime::::from_str("2023-03-01T12:00:00Z").unwrap(); - let current_time = update_time.map(|t| t.and_utc()).unwrap_or(max_time); let max_date = max_time.date_naive(); - - C::init_recursively(&db, ¤t_time).await.unwrap(); fill_mock_blockscout_data(&blockscout, max_date).await; let mut parameters = UpdateParameters { @@ -372,7 +365,20 @@ async fn simple_test_counter_inner( assert_eq!(expected, get_counter::(&db).await); } -async fn get_counter(db: &DatabaseConnection) -> String { +pub async fn prepare_chart_test( + test_name: &str, + init_time: Option, +) -> (DateTime, TestDbGuard, TestDbGuard) { + let _ = tracing_subscriber::fmt::try_init(); + let (db, blockscout) = init_db_all(test_name).await; + let init_time = init_time + .map(|t| t.and_utc()) + .unwrap_or(DateTime::::from_str("2023-03-01T12:00:00Z").unwrap()); + C::init_recursively(&db, &init_time).await.unwrap(); + (init_time, db, blockscout) +} + +pub async fn get_counter(db: &DatabaseConnection) -> String { let data = get_raw_counters(db).await.unwrap(); let data = &data[&C::name()]; data.value.clone() diff --git a/stats/stats/src/utils.rs b/stats/stats/src/utils.rs index 288c8d3fe..185da50e4 100644 --- a/stats/stats/src/utils.rs +++ b/stats/stats/src/utils.rs @@ -5,6 +5,9 @@ use std::ops::{Range, RangeInclusive}; use chrono::{NaiveDate, NaiveTime}; use sea_orm::{prelude::DateTimeUtc, Value}; +// this const is not public in `chrono` for some reason +pub const NANOS_PER_SEC: i32 = 1_000_000_000; + pub fn day_start(date: &NaiveDate) -> DateTimeUtc { date.and_time(NaiveTime::from_hms_opt(0, 0, 0).expect("correct time")) .and_utc()