Skip to content

Commit

Permalink
feat: adds fourfours strategy to glados-audit
Browse files Browse the repository at this point in the history
  • Loading branch information
mrferris committed Feb 21, 2024
1 parent 98984c0 commit df1260c
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 81 deletions.
3 changes: 3 additions & 0 deletions entity/src/content_audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub enum SelectionStrategy {
SelectOldestUnaudited = 3,
/// Perform a single audit for a previously audited content key.
SpecificContentKey = 4,
/// Perform audits of random fourfours data.
FourFours = 5,
}

impl AuditResult {
Expand Down Expand Up @@ -157,6 +159,7 @@ impl SelectionStrategy {
SelectionStrategy::Latest => "Latest".to_string(),
SelectionStrategy::Random => "Random".to_string(),
SelectionStrategy::Failed => "Failed".to_string(),
SelectionStrategy::FourFours => "FourFours".to_string(),
SelectionStrategy::SelectOldestUnaudited => "Select Oldest Unaudited".to_string(),
SelectionStrategy::SpecificContentKey => "Specific Content Key".to_string(),
}
Expand Down
3 changes: 1 addition & 2 deletions glados-audit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ clap = { version = "4.0.24", features = ["derive"] }
entity = { path = "../entity" }
env_logger = "0.9.3"
ethereum-types = "0.14.0"
web3 = "0.18.0"
ethportal-api = "0.2.2"
glados-core = { path = "../glados-core" }
migration = { path = "../migration" }
Expand All @@ -24,5 +25,3 @@ serde_json = "1.0.95"
tokio = "1.21.2"
tracing = "0.1.37"
url = "2.3.1"
web3 = "0.18.0"

12 changes: 11 additions & 1 deletion glados-audit/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const DEFAULT_STATS_PERIOD: &str = "300";
pub struct Args {
#[arg(short, long, default_value = DEFAULT_DB_URL)]
pub database_url: String,
#[arg(short, long, default_value = "", help = "web3 api provider url")]
pub provider_url: String,
#[arg(short, long, default_value = "4", help = "number of auditing threads")]
pub concurrency: u8,
#[arg(short, long, action(ArgAction::Append), value_enum, default_value = None, help = "Specific strategy to use. Default is to use all available strategies. May be passed multiple times for multiple strategies (--strategy latest --strategy random). Duplicates are permitted (--strategy random --strategy random).")]
Expand Down Expand Up @@ -41,9 +43,15 @@ pub struct Args {
help = "relative weight of the 'random' strategy"
)]
pub random_strategy_weight: u8,
#[arg(
long,
default_value = "1",
help = "relative weight of the 'fourfours' strategy"
)]
pub fourfours_strategy_weight: u8,
#[arg(long, default_value = DEFAULT_STATS_PERIOD, help = "stats recording period (seconds)")]
pub stats_recording_period: u64,
#[arg(short, long, action(ArgAction::Append))]
#[arg(long, action(ArgAction::Append))]
pub portal_client: Vec<String>,
#[command(subcommand)]
pub subcommand: Option<Command>,
Expand All @@ -63,11 +71,13 @@ impl Default for Args {
fn default() -> Self {
Self {
database_url: DEFAULT_DB_URL.to_string(),
provider_url: "".to_string(),
concurrency: 4,
latest_strategy_weight: 1,
failed_strategy_weight: 1,
oldest_strategy_weight: 1,
random_strategy_weight: 1,
fourfours_strategy_weight: 1,
strategy: None,
portal_client: vec!["ipc:////tmp/trin-jsonrpc.ipc".to_owned()],
subcommand: None,
Expand Down
10 changes: 10 additions & 0 deletions glados-audit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub(crate) mod validation;
pub struct AuditConfig {
/// For Glados-related data.
pub database_url: String,
/// For getting on-the-fly block information.
pub provider_url: String,
/// Specific strategies to run.
pub strategies: Vec<SelectionStrategy>,
/// Weight for each strategy.
Expand Down Expand Up @@ -65,6 +67,10 @@ impl AuditConfig {
"Selected concurrency set."
)
}
if args.provider_url.is_empty() && args.fourfours_strategy_weight > 0 {
error!("No provider URL provided.");
return Err(anyhow::anyhow!("No provider URL provided."));
}
let strategies = match args.strategy {
Some(s) => s,
None => {
Expand All @@ -73,6 +79,7 @@ impl AuditConfig {
SelectionStrategy::Random,
SelectionStrategy::Failed,
SelectionStrategy::SelectOldestUnaudited,
SelectionStrategy::FourFours,
]
}
};
Expand All @@ -83,6 +90,7 @@ impl AuditConfig {
SelectionStrategy::Random => args.random_strategy_weight,
SelectionStrategy::Failed => args.failed_strategy_weight,
SelectionStrategy::SelectOldestUnaudited => args.oldest_strategy_weight,
SelectionStrategy::FourFours => args.fourfours_strategy_weight,
SelectionStrategy::SpecificContentKey => 0,
};
weights.insert(strat.clone(), weight);
Expand All @@ -96,6 +104,7 @@ impl AuditConfig {
}
Ok(AuditConfig {
database_url: args.database_url,
provider_url: args.provider_url,
strategies,
weights,
concurrency: args.concurrency,
Expand Down Expand Up @@ -160,6 +169,7 @@ pub async fn run_glados_audit(conn: DatabaseConnection, config: AuditConfig) {
strategy.clone(),
tx,
conn.clone(),
config.clone(),
));
}
// Collation of generated tasks, taken proportional to weights.
Expand Down
52 changes: 51 additions & 1 deletion glados-audit/src/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashSet;

use chrono::{DateTime, TimeZone, Utc};
use ethportal_api::HistoryContentKey;
use glados_core::db::store_block_keys;
use rand::{thread_rng, Rng};
use sea_orm::{
ColumnTrait, DatabaseConnection, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder,
Expand All @@ -17,17 +18,26 @@ use entity::{
content::{self, Model},
content_audit::{self, SelectionStrategy},
};
use web3::types::{BlockId, BlockNumber};

use crate::AuditTask;
use crate::{AuditConfig, AuditTask};

pub const MERGE_BLOCK_HEIGHT: i32 = 15537393;

pub async fn start_audit_selection_task(
strategy: SelectionStrategy,
tx: mpsc::Sender<AuditTask>,
conn: DatabaseConnection,
config: AuditConfig,
) {
match strategy {
SelectionStrategy::Latest => select_latest_content_for_audit(tx, conn).await,
SelectionStrategy::Random => select_random_content_for_audit(tx, conn).await,
SelectionStrategy::FourFours => {
// Fourfours strategy downloads its own keys rather than waiting on glados-monitor to put them in the DB.
let w3 = web3::Web3::new(web3::transports::Http::new(&config.provider_url).unwrap());
select_fourfours_content_for_audit(tx, conn, w3).await
}
SelectionStrategy::Failed => warn!("Need to implement SelectionStrategy::Failed"),
SelectionStrategy::SelectOldestUnaudited => {
select_oldest_unaudited_content_for_audit(tx, conn).await
Expand Down Expand Up @@ -95,6 +105,46 @@ async fn select_latest_content_for_audit(
}
}

/// Finds and sends audit tasks for [SelectionStrategy::FourFours].
///
/// 1. Get a random block number between 1 and MERGE_BLOCK_HEIGHT.
/// 2. Get the block hash for that block.
/// 3. Send content keys for header, body, receipts.
///
async fn select_fourfours_content_for_audit(
tx: mpsc::Sender<AuditTask>,
conn: DatabaseConnection,
w3: web3::Web3<web3::transports::Http>,
) -> ! {
let mut interval = interval(Duration::from_secs(5));

loop {
interval.tick().await;
let block_number = thread_rng().gen_range(1..MERGE_BLOCK_HEIGHT);
debug!(
strategy = "4444s",
"Getting hash for block number {block_number}."
);
let block_hash = w3
.eth()
.block(BlockId::Number(BlockNumber::Number(block_number.into())))
.await
.unwrap()
.unwrap()
.hash
.unwrap();

let items_to_audit =
store_block_keys(block_number, block_hash.as_fixed_bytes(), &conn).await;
debug!(
strategy = "4444s",
item_count = items_to_audit.len(),
"Adding content keys to the audit queue."
);
add_to_queue(tx.clone(), SelectionStrategy::FourFours, items_to_audit).await;
}
}

/// Adds Glados database History sub-protocol search results
/// to a channel for auditing against a Portal Node.
async fn add_to_queue(
Expand Down
98 changes: 98 additions & 0 deletions glados-core/src/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use anyhow::Error;
use entity::{content, execution_metadata};
use ethportal_api::{
utils::bytes::hex_encode, BlockBodyKey, BlockHeaderKey, BlockReceiptsKey, HistoryContentKey,
OverlayContentKey,
};
use sea_orm::DatabaseConnection;
use tracing::{debug, error};

/// Stores the content keys and block metadata for the given block.
///
/// The metadata included is the block number and hash under the execution
/// header, body and receipts tables.
///
/// Errors are logged.
pub async fn store_block_keys(
block_number: i32,
block_hash: &[u8; 32],
conn: &DatabaseConnection,
) -> Vec<content::Model> {
let header = HistoryContentKey::BlockHeaderWithProof(BlockHeaderKey {
block_hash: *block_hash,
});
let body = HistoryContentKey::BlockBody(BlockBodyKey {
block_hash: *block_hash,
});
let receipts = HistoryContentKey::BlockReceipts(BlockReceiptsKey {
block_hash: *block_hash,
});

let header = store_content_key(&header, "block_header", block_number, conn).await;
let body = store_content_key(&body, "block_body", block_number, conn).await;
let receipts = store_content_key(&receipts, "block_receipts", block_number, conn).await;

let mut returned_values = vec![];
if let Some(header) = header {
returned_values.push(header);
}
if let Some(body) = body {
returned_values.push(body);
}
if let Some(receipts) = receipts {
returned_values.push(receipts);
}
returned_values
}

/// Accepts a ContentKey from the History and attempts to store it.
///
/// Errors are logged.
pub async fn store_content_key<T: OverlayContentKey>(
key: &T,
name: &str,
block_number: i32,
conn: &DatabaseConnection,
) -> Option<content::Model> {
// Store key
match content::get_or_create(key, conn).await {
Ok(content_model) => {
log_record_outcome(key, name, DbOutcome::Success);
// Store metadata
let metadata_str = format!("{name}_metadata");
match execution_metadata::get_or_create(content_model.id, block_number, conn).await {
Ok(_) => log_record_outcome(key, metadata_str.as_str(), DbOutcome::Success),
Err(e) => log_record_outcome(key, metadata_str.as_str(), DbOutcome::Fail(e)),
};
Some(content_model)
}
Err(e) => {
log_record_outcome(key, name, DbOutcome::Fail(e));
None
}
}
}

/// Logs a database record error for the given key.
///
/// Helper function for common error pattern to be logged.
pub fn log_record_outcome<T: OverlayContentKey>(key: &T, name: &str, outcome: DbOutcome) {
match outcome {
DbOutcome::Success => debug!(
content.key = hex_encode(key.to_bytes()),
content.kind = name,
"Imported new record",
),
DbOutcome::Fail(e) => error!(
content.key=hex_encode(key.to_bytes()),
content.kind=name,
err=?e,
"Failed to create database record",
),
}
}

pub enum DbOutcome {
Success,
Fail(Error),
}
1 change: 1 addition & 0 deletions glados-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod db;
pub mod jsonrpc;
pub mod stats;
Loading

0 comments on commit df1260c

Please sign in to comment.