From df1260cabb1a9d037b150b069e99f7ca37a64bb7 Mon Sep 17 00:00:00 2001 From: mrferris Date: Tue, 20 Feb 2024 19:27:19 -0500 Subject: [PATCH] feat: adds fourfours strategy to glados-audit --- entity/src/content_audit.rs | 3 ++ glados-audit/Cargo.toml | 3 +- glados-audit/src/cli.rs | 12 ++++- glados-audit/src/lib.rs | 10 ++++ glados-audit/src/selection.rs | 52 ++++++++++++++++++- glados-core/src/db.rs | 98 +++++++++++++++++++++++++++++++++++ glados-core/src/lib.rs | 1 + glados-monitor/src/lib.rs | 82 ++--------------------------- 8 files changed, 180 insertions(+), 81 deletions(-) create mode 100644 glados-core/src/db.rs diff --git a/entity/src/content_audit.rs b/entity/src/content_audit.rs index cc8cdab5..9f47ac21 100644 --- a/entity/src/content_audit.rs +++ b/entity/src/content_audit.rs @@ -37,6 +37,8 @@ pub enum SelectionStrategy { SelectOldestUnaudited = 3, /// Perform a single audit for a previously audited content key. SpecificContentKey = 4, + /// Perform audits of random fourfours data. + FourFours = 5, } impl AuditResult { @@ -157,6 +159,7 @@ impl SelectionStrategy { SelectionStrategy::Latest => "Latest".to_string(), SelectionStrategy::Random => "Random".to_string(), SelectionStrategy::Failed => "Failed".to_string(), + SelectionStrategy::FourFours => "FourFours".to_string(), SelectionStrategy::SelectOldestUnaudited => "Select Oldest Unaudited".to_string(), SelectionStrategy::SpecificContentKey => "Specific Content Key".to_string(), } diff --git a/glados-audit/Cargo.toml b/glados-audit/Cargo.toml index 57be025c..a901d476 100644 --- a/glados-audit/Cargo.toml +++ b/glados-audit/Cargo.toml @@ -15,6 +15,7 @@ clap = { version = "4.0.24", features = ["derive"] } entity = { path = "../entity" } env_logger = "0.9.3" ethereum-types = "0.14.0" +web3 = "0.18.0" ethportal-api = "0.2.2" glados-core = { path = "../glados-core" } migration = { path = "../migration" } @@ -24,5 +25,3 @@ serde_json = "1.0.95" tokio = "1.21.2" tracing = "0.1.37" url = "2.3.1" -web3 = "0.18.0" - diff --git a/glados-audit/src/cli.rs b/glados-audit/src/cli.rs index 39ba026e..6c1f5d42 100644 --- a/glados-audit/src/cli.rs +++ b/glados-audit/src/cli.rs @@ -9,6 +9,8 @@ const DEFAULT_STATS_PERIOD: &str = "300"; pub struct Args { #[arg(short, long, default_value = DEFAULT_DB_URL)] pub database_url: String, + #[arg(short, long, default_value = "", help = "web3 api provider url")] + pub provider_url: String, #[arg(short, long, default_value = "4", help = "number of auditing threads")] pub concurrency: u8, #[arg(short, long, action(ArgAction::Append), value_enum, default_value = None, help = "Specific strategy to use. Default is to use all available strategies. May be passed multiple times for multiple strategies (--strategy latest --strategy random). Duplicates are permitted (--strategy random --strategy random).")] @@ -41,9 +43,15 @@ pub struct Args { help = "relative weight of the 'random' strategy" )] pub random_strategy_weight: u8, + #[arg( + long, + default_value = "1", + help = "relative weight of the 'fourfours' strategy" + )] + pub fourfours_strategy_weight: u8, #[arg(long, default_value = DEFAULT_STATS_PERIOD, help = "stats recording period (seconds)")] pub stats_recording_period: u64, - #[arg(short, long, action(ArgAction::Append))] + #[arg(long, action(ArgAction::Append))] pub portal_client: Vec, #[command(subcommand)] pub subcommand: Option, @@ -63,11 +71,13 @@ impl Default for Args { fn default() -> Self { Self { database_url: DEFAULT_DB_URL.to_string(), + provider_url: "".to_string(), concurrency: 4, latest_strategy_weight: 1, failed_strategy_weight: 1, oldest_strategy_weight: 1, random_strategy_weight: 1, + fourfours_strategy_weight: 1, strategy: None, portal_client: vec!["ipc:////tmp/trin-jsonrpc.ipc".to_owned()], subcommand: None, diff --git a/glados-audit/src/lib.rs b/glados-audit/src/lib.rs index b9db6efc..dcaa0c76 100644 --- a/glados-audit/src/lib.rs +++ b/glados-audit/src/lib.rs @@ -37,6 +37,8 @@ pub(crate) mod validation; pub struct AuditConfig { /// For Glados-related data. pub database_url: String, + /// For getting on-the-fly block information. + pub provider_url: String, /// Specific strategies to run. pub strategies: Vec, /// Weight for each strategy. @@ -65,6 +67,10 @@ impl AuditConfig { "Selected concurrency set." ) } + if args.provider_url.is_empty() && args.fourfours_strategy_weight > 0 { + error!("No provider URL provided."); + return Err(anyhow::anyhow!("No provider URL provided.")); + } let strategies = match args.strategy { Some(s) => s, None => { @@ -73,6 +79,7 @@ impl AuditConfig { SelectionStrategy::Random, SelectionStrategy::Failed, SelectionStrategy::SelectOldestUnaudited, + SelectionStrategy::FourFours, ] } }; @@ -83,6 +90,7 @@ impl AuditConfig { SelectionStrategy::Random => args.random_strategy_weight, SelectionStrategy::Failed => args.failed_strategy_weight, SelectionStrategy::SelectOldestUnaudited => args.oldest_strategy_weight, + SelectionStrategy::FourFours => args.fourfours_strategy_weight, SelectionStrategy::SpecificContentKey => 0, }; weights.insert(strat.clone(), weight); @@ -96,6 +104,7 @@ impl AuditConfig { } Ok(AuditConfig { database_url: args.database_url, + provider_url: args.provider_url, strategies, weights, concurrency: args.concurrency, @@ -160,6 +169,7 @@ pub async fn run_glados_audit(conn: DatabaseConnection, config: AuditConfig) { strategy.clone(), tx, conn.clone(), + config.clone(), )); } // Collation of generated tasks, taken proportional to weights. diff --git a/glados-audit/src/selection.rs b/glados-audit/src/selection.rs index 2cdaee27..9e47124e 100644 --- a/glados-audit/src/selection.rs +++ b/glados-audit/src/selection.rs @@ -2,6 +2,7 @@ use std::collections::HashSet; use chrono::{DateTime, TimeZone, Utc}; use ethportal_api::HistoryContentKey; +use glados_core::db::store_block_keys; use rand::{thread_rng, Rng}; use sea_orm::{ ColumnTrait, DatabaseConnection, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, @@ -17,17 +18,26 @@ use entity::{ content::{self, Model}, content_audit::{self, SelectionStrategy}, }; +use web3::types::{BlockId, BlockNumber}; -use crate::AuditTask; +use crate::{AuditConfig, AuditTask}; + +pub const MERGE_BLOCK_HEIGHT: i32 = 15537393; pub async fn start_audit_selection_task( strategy: SelectionStrategy, tx: mpsc::Sender, conn: DatabaseConnection, + config: AuditConfig, ) { match strategy { SelectionStrategy::Latest => select_latest_content_for_audit(tx, conn).await, SelectionStrategy::Random => select_random_content_for_audit(tx, conn).await, + SelectionStrategy::FourFours => { + // Fourfours strategy downloads its own keys rather than waiting on glados-monitor to put them in the DB. + let w3 = web3::Web3::new(web3::transports::Http::new(&config.provider_url).unwrap()); + select_fourfours_content_for_audit(tx, conn, w3).await + } SelectionStrategy::Failed => warn!("Need to implement SelectionStrategy::Failed"), SelectionStrategy::SelectOldestUnaudited => { select_oldest_unaudited_content_for_audit(tx, conn).await @@ -95,6 +105,46 @@ async fn select_latest_content_for_audit( } } +/// Finds and sends audit tasks for [SelectionStrategy::FourFours]. +/// +/// 1. Get a random block number between 1 and MERGE_BLOCK_HEIGHT. +/// 2. Get the block hash for that block. +/// 3. Send content keys for header, body, receipts. +/// +async fn select_fourfours_content_for_audit( + tx: mpsc::Sender, + conn: DatabaseConnection, + w3: web3::Web3, +) -> ! { + let mut interval = interval(Duration::from_secs(5)); + + loop { + interval.tick().await; + let block_number = thread_rng().gen_range(1..MERGE_BLOCK_HEIGHT); + debug!( + strategy = "4444s", + "Getting hash for block number {block_number}." + ); + let block_hash = w3 + .eth() + .block(BlockId::Number(BlockNumber::Number(block_number.into()))) + .await + .unwrap() + .unwrap() + .hash + .unwrap(); + + let items_to_audit = + store_block_keys(block_number, block_hash.as_fixed_bytes(), &conn).await; + debug!( + strategy = "4444s", + item_count = items_to_audit.len(), + "Adding content keys to the audit queue." + ); + add_to_queue(tx.clone(), SelectionStrategy::FourFours, items_to_audit).await; + } +} + /// Adds Glados database History sub-protocol search results /// to a channel for auditing against a Portal Node. async fn add_to_queue( diff --git a/glados-core/src/db.rs b/glados-core/src/db.rs new file mode 100644 index 00000000..37fda5b5 --- /dev/null +++ b/glados-core/src/db.rs @@ -0,0 +1,98 @@ +use anyhow::Error; +use entity::{content, execution_metadata}; +use ethportal_api::{ + utils::bytes::hex_encode, BlockBodyKey, BlockHeaderKey, BlockReceiptsKey, HistoryContentKey, + OverlayContentKey, +}; +use sea_orm::DatabaseConnection; +use tracing::{debug, error}; + +/// Stores the content keys and block metadata for the given block. +/// +/// The metadata included is the block number and hash under the execution +/// header, body and receipts tables. +/// +/// Errors are logged. +pub async fn store_block_keys( + block_number: i32, + block_hash: &[u8; 32], + conn: &DatabaseConnection, +) -> Vec { + let header = HistoryContentKey::BlockHeaderWithProof(BlockHeaderKey { + block_hash: *block_hash, + }); + let body = HistoryContentKey::BlockBody(BlockBodyKey { + block_hash: *block_hash, + }); + let receipts = HistoryContentKey::BlockReceipts(BlockReceiptsKey { + block_hash: *block_hash, + }); + + let header = store_content_key(&header, "block_header", block_number, conn).await; + let body = store_content_key(&body, "block_body", block_number, conn).await; + let receipts = store_content_key(&receipts, "block_receipts", block_number, conn).await; + + let mut returned_values = vec![]; + if let Some(header) = header { + returned_values.push(header); + } + if let Some(body) = body { + returned_values.push(body); + } + if let Some(receipts) = receipts { + returned_values.push(receipts); + } + returned_values +} + +/// Accepts a ContentKey from the History and attempts to store it. +/// +/// Errors are logged. +pub async fn store_content_key( + key: &T, + name: &str, + block_number: i32, + conn: &DatabaseConnection, +) -> Option { + // Store key + match content::get_or_create(key, conn).await { + Ok(content_model) => { + log_record_outcome(key, name, DbOutcome::Success); + // Store metadata + let metadata_str = format!("{name}_metadata"); + match execution_metadata::get_or_create(content_model.id, block_number, conn).await { + Ok(_) => log_record_outcome(key, metadata_str.as_str(), DbOutcome::Success), + Err(e) => log_record_outcome(key, metadata_str.as_str(), DbOutcome::Fail(e)), + }; + Some(content_model) + } + Err(e) => { + log_record_outcome(key, name, DbOutcome::Fail(e)); + None + } + } +} + +/// Logs a database record error for the given key. +/// +/// Helper function for common error pattern to be logged. +pub fn log_record_outcome(key: &T, name: &str, outcome: DbOutcome) { + match outcome { + DbOutcome::Success => debug!( + content.key = hex_encode(key.to_bytes()), + content.kind = name, + "Imported new record", + ), + DbOutcome::Fail(e) => error!( + content.key=hex_encode(key.to_bytes()), + content.kind=name, + err=?e, + "Failed to create database record", + ), + } +} + +pub enum DbOutcome { + Success, + Fail(Error), +} diff --git a/glados-core/src/lib.rs b/glados-core/src/lib.rs index a230539d..0e21e819 100644 --- a/glados-core/src/lib.rs +++ b/glados-core/src/lib.rs @@ -1,2 +1,3 @@ +pub mod db; pub mod jsonrpc; pub mod stats; diff --git a/glados-monitor/src/lib.rs b/glados-monitor/src/lib.rs index f35daa0f..bc226a47 100644 --- a/glados-monitor/src/lib.rs +++ b/glados-monitor/src/lib.rs @@ -1,13 +1,11 @@ use std::path::PathBuf; use std::time::Duration; -use anyhow::{anyhow, Error, Result}; -use ethportal_api::utils::bytes::{hex_decode, hex_encode}; -use ethportal_api::{ - BlockBodyKey, BlockHeaderKey, BlockReceiptsKey, EpochAccumulatorKey, HistoryContentKey, - OverlayContentKey, -}; +use anyhow::{anyhow, Result}; +use ethportal_api::utils::bytes::hex_decode; +use ethportal_api::{EpochAccumulatorKey, HistoryContentKey}; use futures::future::join_all; +use glados_core::db::store_block_keys; use reqwest::header; use sea_orm::{ConnectionTrait, DatabaseBackend, DatabaseConnection}; use std::env; @@ -22,7 +20,7 @@ use web3::Web3; use url::Url; -use entity::{content, execution_metadata}; +use entity::content; pub mod cli; @@ -139,76 +137,6 @@ async fn fetch_block_hash( Ok(block_hash) } -/// Stores the content keys and block metadata for the given block. -/// -/// The metadata included is the block number and hash under the execution -/// header, body and receipts tables. -/// -/// Errors are logged. -async fn store_block_keys(block_number: i32, block_hash: &[u8; 32], conn: &DatabaseConnection) { - let header = HistoryContentKey::BlockHeaderWithProof(BlockHeaderKey { - block_hash: *block_hash, - }); - let body = HistoryContentKey::BlockBody(BlockBodyKey { - block_hash: *block_hash, - }); - let receipts = HistoryContentKey::BlockReceipts(BlockReceiptsKey { - block_hash: *block_hash, - }); - - store_content_key(&header, "block_header", block_number, conn).await; - store_content_key(&body, "block_body", block_number, conn).await; - store_content_key(&receipts, "block_receipts", block_number, conn).await; -} - -/// Accepts a ContentKey from the History and attempts to store it. -/// -/// Errors are logged. -async fn store_content_key( - key: &T, - name: &str, - block_number: i32, - conn: &DatabaseConnection, -) { - // Store key - match content::get_or_create(key, conn).await { - Ok(content_model) => { - log_record_outcome(key, name, DbOutcome::Success); - // Store metadata - let metadata_str = format!("{name}_metadata"); - match execution_metadata::get_or_create(content_model.id, block_number, conn).await { - Ok(_) => log_record_outcome(key, metadata_str.as_str(), DbOutcome::Success), - Err(e) => log_record_outcome(key, metadata_str.as_str(), DbOutcome::Fail(e)), - }; - } - Err(e) => log_record_outcome(key, name, DbOutcome::Fail(e)), - } -} - -/// Logs a database record error for the given key. -/// -/// Helper function for common error pattern to be logged. -fn log_record_outcome(key: &T, name: &str, outcome: DbOutcome) { - match outcome { - DbOutcome::Success => debug!( - content.key = hex_encode(key.to_bytes()), - content.kind = name, - "Imported new record", - ), - DbOutcome::Fail(e) => error!( - content.key=hex_encode(key.to_bytes()), - content.kind=name, - err=?e, - "Failed to create database record", - ), - } -} - -enum DbOutcome { - Success, - Fail(Error), -} - pub async fn import_pre_merge_accumulators( conn: DatabaseConnection, base_path: PathBuf,