Skip to content

Commit

Permalink
feat(stats): wait until blockscout is indexed (#1075)
Browse files Browse the repository at this point in the history
- configurable thresholds for ratios returned from indexing-status endpoint
- wait until they're satisfied & launch stats
- env for blockscout api that is required unless the next env is set
- `IGNORE_​​BLOCKSCOUT_​API_​ABSENCE` env that disables blockscout api related features if the API is not provided
  • Loading branch information
bragov4ik authored Oct 8, 2024
1 parent d2cf3d0 commit 19dd633
Show file tree
Hide file tree
Showing 15 changed files with 823 additions and 112 deletions.
465 changes: 367 additions & 98 deletions stats/Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion stats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ members = [
]

[workspace.dependencies]
blockscout-client = { git = "https://github.com/blockscout/blockscout-rs/", rev = "506b821" }
blockscout-service-launcher = { version = "0.13.1" }

rstest = "0.23.0"
wiremock = "0.6.2"

# todo: update version after https://github.com/chronotope/chrono/pull/1600
# and remove patch
Expand Down
11 changes: 9 additions & 2 deletions stats/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,14 @@ by enabling word wrapping
| `STATS__FORCE_​UPDATE_ON_START` | | Fully recalculate all charts on start | `false` |
| `STATS__CONCURRENT_​START_UPDATES` | | Amount of concurrent charts update on start | `3` |
| `STATS__​DEFAULT_​SCHEDULE` | | Schedule used for update groups with no config | `"0 0 1 * * * *"` |
| `STATS__LIMITS__REQUESTED_​POINTS_LIMIT` | | Maximum allowed number of requested points | `182500` |
| `STATS__LIMITS__REQUESTED_​POINTS_LIMIT` | | Maximum allowed number of requested points | `182500` |
| `STATS__BLOCKSCOUT_API_URL` | Required unless `STATS__​IGNORE_​​BLOCKSCOUT_​API_​ABSENCE` is set to `true`. | URL to Blockscout API. | `null` |
| `STATS__CONDITIONAL_​START__CHECK_PERIOD_SECS` | | Time between start condition checking (if they are not satisfied) | `5` |
| `STATS__CONDITIONAL_​START__BLOCKS_RATIO__​ENABLED` | | Enable `blocks_​ratio` threshold | `true` |
| `STATS__CONDITIONAL_​START__BLOCKS_RATIO__​THRESHOLD` | | Value for `blocks_​ratio` threshold | `0.98` |
| `STATS__CONDITIONAL_​START__INTERNAL_​TRANSACTIONS_RATIO__​ENABLED` | | Enable `internal_​transactions_​ratio` threshold | `true` |
| `STATS__CONDITIONAL_​START__INTERNAL_​TRANSACTIONS_RATIO__​THRESHOLD` | | Value for `internal_​transactions_​ratio` threshold | `0.98` |
| `STATS__IGNORE_​BLOCKSCOUT_API_ABSENCE` | | Disable requirement for blockscout api url setting. Turns off corresponding features if the api setting is not set | `false` |

[anchor]: <> (anchors.envs.end.service)

Expand Down Expand Up @@ -163,7 +170,7 @@ by enabling word wrapping
| `STATS_CHARTS__​LINE_CHARTS__​<LINE_CHART_NAME>__​RESOLUTIONS__YEAR` | | Enable yearly data | `true` if defined |
| `STATS_CHARTS__​LINE_CHARTS__​<LINE_CHART_NAME>__​TITLE` | | Displayed name of `<LINE_CHART_NAME>`, e.g. `"Some line chart title"` | `null` |
| `STATS_CHARTS__​LINE_CHARTS__​<LINE_CHART_NAME>__​UNITS` | | Measurement units, e.g. `"{{<variable_name>}}"` | `null` |
| `STATS_CHARTS__​TEMPLATE_VALUES__​<VARIABLE_NAME>` | | Value to substitute instead of `{{<variable_name>}}`, e.g. `"some_value"` | `null` |
| `STATS_CHARTS__​TEMPLATE_VALUES__​<VARIABLE_NAME>` | | Value to substitute instead of `{{<variable_name>}}`, e.g. `STATS_CHARTS__​TEMPLATE_VALUES__​NATIVE_COIN_SYMBOL​="some_value"`. See full list of variables in charts config file (`charts.json`). | `null` |

[anchor]: <> (anchors.envs.end.charts)

Expand Down
3 changes: 3 additions & 0 deletions stats/justfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ test-with-db *args:
check-envs:
VALIDATE_ONLY=true cargo run --bin env-docs-generation

generate-envs:
cargo run --bin env-docs-generation

restart-generate-entities:
-just stop-postgres
just start-postgres
Expand Down
2 changes: 1 addition & 1 deletion stats/stats-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ actix-web = "4"
prost = "0.11"
tonic = "0.8"
serde = { version = "1", features = ["derive"] }
serde_with = { version = "2.0", features = ["hex", "base64"] }
serde_with = { version = "3", features = ["hex", "base64"] }
async-trait = "0.1"

[dev-dependencies]
Expand Down
8 changes: 6 additions & 2 deletions stats/stats-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ stats = { path = "../stats" }
stats-proto = { path = "../stats-proto" }
async-trait = "0.1"
actix-web = "4"
reqwest = "0.12"
tonic = "0.8"
serde = { version = "1", features = ["derive"] }
serde_with = { version = "2.0", features = ["hex", "base64"] }
serde_with = { version = "3", features = ["hex", "base64"] }
bytes = "1.2"
tokio = { version = "1", features = ["rt-multi-thread"] }
config = "0.13"
Expand All @@ -25,16 +26,19 @@ sea-orm = { version = "0.12", features = [
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
blockscout-service-launcher = { workspace = true, features = [ "database-0_12" ] }
blockscout-endpoint-swagger = { git = "https://github.com/blockscout/blockscout-rs", rev = "0d01bdf7" }
blockscout-client = { workspace = true }
cron = "0.12"
convert_case = "0.6.0"
itertools = "0.13.0"
liquid-json = "0.5.0"
serde_json = "1.0"
paste = "1.0"

url = { version = "2.5", features = ["serde"] }

[dev-dependencies]
stats = { path = "../stats", features = ["test-utils"] }
blockscout-service-launcher = { workspace = true, features = [ "database-0_12", "test-server" ] }
pretty_assertions = "1.3"
reqwest = "0.12"
wiremock = { workspace = true }
rstest = { workspace = true }
284 changes: 284 additions & 0 deletions stats/stats-server/src/blockscout_waiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
use std::time::Duration;

use crate::settings::{Settings, StartConditionSettings, ToggleableThreshold};

use anyhow::Context;
use blockscout_service_launcher::launcher::ConfigSettings;
use reqwest::StatusCode;
use tokio::time::sleep;
use tracing::{info, warn};

fn is_retryable_code(status_code: &reqwest::StatusCode) -> bool {
matches!(
*status_code,
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT
| StatusCode::TOO_MANY_REQUESTS
| StatusCode::IM_A_TEAPOT
)
}

fn is_threshold_passed(
threshold: &ToggleableThreshold,
float_value: Option<String>,
value_name: &str,
) -> Result<bool, anyhow::Error> {
let threshold = if threshold.enabled {
threshold.threshold
} else {
return Ok(true);
};
let value = float_value
.map(|s| s.parse::<f64>())
.transpose()
.context(format!("Parsing `{value_name}`"))?;
let Some(value) = value else {
anyhow::bail!("Received `null` value of `{value_name}`. Can't determine indexing status.",);
};
if value < threshold {
info!(
threshold = threshold,
current_value = value,
"Threshold for `{value_name}` is not satisfied"
);
Ok(false)
} else {
info!(
threshold = threshold,
current_value = value,
"Threshold for `{value_name}` is satisfied"
);
Ok(true)
}
}

pub async fn wait_for_blockscout_indexing(
api_config: blockscout_client::Configuration,
wait_config: StartConditionSettings,
) -> Result<(), anyhow::Error> {
loop {
match blockscout_client::apis::main_page_api::get_indexing_status(&api_config).await {
Ok(result)
if is_threshold_passed(
&wait_config.blocks_ratio,
result.indexed_blocks_ratio.clone(),
"indexed_blocks_ratio",
)
.context("check index block ratio")?
&& is_threshold_passed(
&wait_config.internal_transactions_ratio,
result.indexed_internal_transactions_ratio.clone(),
"indexed_internal_transactions_ratio",
)? =>
{
info!("Blockscout indexing threshold passed");
return Ok(());
}
Ok(_) => {}
Err(blockscout_client::Error::ResponseError(r)) if is_retryable_code(&r.status) => {
warn!("Error from indexing status endpoint: {r:?}");
}
Err(e) => {
return Err(e).context("Requesting indexing status");
}
}
info!(
"Blockscout is not indexed enough. Checking again in {} secs",
wait_config.check_period_secs
);
sleep(Duration::from_secs(wait_config.check_period_secs.into())).await;
}
}

pub async fn init_blockscout_api_client(
settings: &Settings,
) -> anyhow::Result<Option<blockscout_client::Configuration>> {
match (settings.ignore_blockscout_api_absence, &settings.blockscout_api_url) {
(_, Some(blockscout_api_url)) => Ok(Some(blockscout_client::Configuration::new(blockscout_api_url.clone()))),
(true, None) => {
info!(
"Blockscout API URL has not been provided and `IGNORE_BLOCKSCOUT_API_ABSENCE` setting is \
set to `true`. Disabling API-related functionality."
);
Ok(None)
}
(false, None) => anyhow::bail!(
"Blockscout API URL has not been provided. Please specify it with corresponding \
env variable (`{0}__BLOCKSCOUT_API_URL`) or set `{0}__IGNORE_BLOCKSCOUT_API_ABSENCE=true` to disable \
functionality depending on the API.",
Settings::SERVICE_NAME
),
}
}

#[cfg(test)]
mod tests {
use std::str::FromStr;

use rstest::*;
use std::time::Duration;
use tokio::{task::JoinSet, time::error::Elapsed};
use url::Url;
use wiremock::{
matchers::{method, path},
Mock, MockServer, ResponseTemplate,
};

use super::*;

async fn mock_indexing_status(response: ResponseTemplate) -> MockServer {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/v2/main-page/indexing-status"))
.respond_with(response)
.mount(&mock_server)
.await;
mock_server
}

async fn test_wait_indexing(
wait_config: StartConditionSettings,
response: ResponseTemplate,
) -> Result<Result<(), anyhow::Error>, Elapsed> {
let server = mock_indexing_status(response).await;
tokio::time::timeout(
Duration::from_millis(500),
wait_for_blockscout_indexing(
blockscout_client::Configuration::new(Url::from_str(&server.uri()).unwrap()),
wait_config,
),
)
.await
}

#[fixture]
fn wait_config(
#[default(0.9)] blocks: f64,
#[default(0.9)] internal_transactions: f64,
) -> StartConditionSettings {
StartConditionSettings {
blocks_ratio: ToggleableThreshold::enabled(blocks),
internal_transactions_ratio: ToggleableThreshold::enabled(internal_transactions),
check_period_secs: 0,
}
}

#[rstest]
#[tokio::test]
async fn wait_for_blockscout_indexing_works_with_200_response(
wait_config: StartConditionSettings,
) {
test_wait_indexing(
wait_config.clone(),
ResponseTemplate::new(200).set_body_string(
r#"{
"finished_indexing": true,
"finished_indexing_blocks": true,
"indexed_blocks_ratio": "1.00",
"indexed_internal_transactions_ratio": "1"
}"#,
),
)
.await
.expect("must not timeout")
.expect("must not error");

test_wait_indexing(
wait_config,
ResponseTemplate::new(200).set_body_string(
r#"{
"finished_indexing": false,
"finished_indexing_blocks": false,
"indexed_blocks_ratio": "0.80",
"indexed_internal_transactions_ratio": "0.80"
}"#,
),
)
.await
.expect_err("must time out");
}

#[rstest]
#[tokio::test]
async fn wait_for_blockscout_indexing_works_with_slow_response(
wait_config: StartConditionSettings,
) {
test_wait_indexing(
wait_config,
ResponseTemplate::new(200)
.set_body_string(
r#"{
"finished_indexing": false,
"finished_indexing_blocks": false,
"indexed_blocks_ratio": "1.0",
"indexed_internal_transactions_ratio": "1.0"
}"#,
)
.set_delay(Duration::from_millis(100)),
)
.await
.expect("must not timeout")
.expect("must not error")
}

#[rstest]
#[tokio::test]
async fn wait_for_blockscout_indexing_works_with_infinite_timeout(
wait_config: StartConditionSettings,
) {
test_wait_indexing(
wait_config,
ResponseTemplate::new(200)
.set_body_string(
r#"{
"finished_indexing": false,
"finished_indexing_blocks": false,
"indexed_blocks_ratio": "0.80",
"indexed_internal_transactions_ratio": "0.80"
}"#,
)
.set_delay(Duration::MAX),
)
.await
.expect_err("must time out");
}

#[rstest]
#[tokio::test]
async fn wait_for_blockscout_indexing_retries_with_error_codes(
wait_config: StartConditionSettings,
) {
let mut error_servers = JoinSet::from_iter([
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(429)),
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(500)),
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(503)),
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(504)),
]);
#[allow(for_loops_over_fallibles)]
for server in error_servers.join_next().await {
let test_result = server.unwrap();
test_result.expect_err("must time out");
}
}

#[rstest]
#[tokio::test]
async fn wait_for_blockscout_indexing_fails_with_error_codes(
wait_config: StartConditionSettings,
) {
let mut error_servers = JoinSet::from_iter([
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(400)),
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(403)),
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(404)),
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(405)),
]);
#[allow(for_loops_over_fallibles)]
for server in error_servers.join_next().await {
let test_result = server.unwrap();
test_result
.expect("must fail immediately")
.expect_err("must report error");
}
}
}
1 change: 1 addition & 0 deletions stats/stats-server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod blockscout_waiter;
mod config;
mod health;
mod read_service;
Expand Down
Loading

0 comments on commit 19dd633

Please sign in to comment.