Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds fourfours strategy to glados-audit #231

Merged
merged 2 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions entity/src/audit_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct Model {
pub success_rate_latest: f32,
pub success_rate_random: f32,
pub success_rate_oldest: f32,
pub success_rate_four_fours: f32,
pub success_rate_all_headers: f32,
pub success_rate_all_bodies: f32,
pub success_rate_all_receipts: f32,
Expand All @@ -25,6 +26,9 @@ pub struct Model {
pub success_rate_random_headers: f32,
pub success_rate_random_bodies: f32,
pub success_rate_random_receipts: f32,
pub success_rate_four_fours_headers: f32,
pub success_rate_four_fours_bodies: f32,
pub success_rate_four_fours_receipts: f32,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand All @@ -40,6 +44,7 @@ pub async fn create(
success_rate_latest: f32,
success_rate_random: f32,
success_rate_oldest: f32,
success_rate_four_fours: f32,
success_rate_all_headers: f32,
success_rate_all_bodies: f32,
success_rate_all_receipts: f32,
Expand All @@ -49,6 +54,9 @@ pub async fn create(
success_rate_random_headers: f32,
success_rate_random_bodies: f32,
success_rate_random_receipts: f32,
success_rate_four_fours_headers: f32,
success_rate_four_fours_bodies: f32,
success_rate_four_fours_receipts: f32,
conn: &DatabaseConnection,
) -> Result<Model> {
let audit_stats = ActiveModel {
Expand All @@ -59,6 +67,7 @@ pub async fn create(
success_rate_latest: Set(success_rate_latest),
success_rate_random: Set(success_rate_random),
success_rate_oldest: Set(success_rate_oldest),
success_rate_four_fours: Set(success_rate_four_fours),
success_rate_all_headers: Set(success_rate_all_headers),
success_rate_all_bodies: Set(success_rate_all_bodies),
success_rate_all_receipts: Set(success_rate_all_receipts),
Expand All @@ -68,6 +77,9 @@ pub async fn create(
success_rate_random_headers: Set(success_rate_random_headers),
success_rate_random_bodies: Set(success_rate_random_bodies),
success_rate_random_receipts: Set(success_rate_random_receipts),
success_rate_four_fours_headers: Set(success_rate_four_fours_headers),
success_rate_four_fours_bodies: Set(success_rate_four_fours_bodies),
success_rate_four_fours_receipts: Set(success_rate_four_fours_receipts),
};
Ok(audit_stats.insert(conn).await?)
}
Expand Down
3 changes: 3 additions & 0 deletions entity/src/content_audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub enum SelectionStrategy {
SelectOldestUnaudited = 3,
/// Perform a single audit for a previously audited content key.
SpecificContentKey = 4,
/// Perform audits of random fourfours data.
FourFours = 5,
}

impl AuditResult {
Expand Down Expand Up @@ -157,6 +159,7 @@ impl SelectionStrategy {
SelectionStrategy::Latest => "Latest".to_string(),
SelectionStrategy::Random => "Random".to_string(),
SelectionStrategy::Failed => "Failed".to_string(),
SelectionStrategy::FourFours => "FourFours".to_string(),
SelectionStrategy::SelectOldestUnaudited => "Select Oldest Unaudited".to_string(),
SelectionStrategy::SpecificContentKey => "Specific Content Key".to_string(),
}
Expand Down
3 changes: 1 addition & 2 deletions glados-audit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ clap = { version = "4.0.24", features = ["derive"] }
entity = { path = "../entity" }
env_logger = "0.9.3"
ethereum-types = "0.14.0"
web3 = "0.18.0"
ethportal-api = "0.2.2"
glados-core = { path = "../glados-core" }
migration = { path = "../migration" }
Expand All @@ -24,5 +25,3 @@ serde_json = "1.0.95"
tokio = "1.21.2"
tracing = "0.1.37"
url = "2.3.1"
web3 = "0.18.0"

37 changes: 36 additions & 1 deletion glados-audit/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ const DEFAULT_STATS_PERIOD: &str = "300";
pub struct Args {
#[arg(short, long, default_value = DEFAULT_DB_URL)]
pub database_url: String,
#[arg(
short,
long,
default_value = "",
help = "web3 api provider url, eg https://mainnet.infura.io/v3/..."
)]
pub provider_url: String,
#[arg(short, long, default_value = "4", help = "number of auditing threads")]
pub concurrency: u8,
#[arg(short, long, action(ArgAction::Append), value_enum, default_value = None, help = "Specific strategy to use. Default is to use all available strategies. May be passed multiple times for multiple strategies (--strategy latest --strategy random). Duplicates are permitted (--strategy random --strategy random).")]
Expand Down Expand Up @@ -41,9 +48,15 @@ pub struct Args {
help = "relative weight of the 'random' strategy"
)]
pub random_strategy_weight: u8,
#[arg(
long,
default_value = "1",
help = "relative weight of the 'four_fours' strategy"
)]
pub four_fours_strategy_weight: u8,
#[arg(long, default_value = DEFAULT_STATS_PERIOD, help = "stats recording period (seconds)")]
pub stats_recording_period: u64,
#[arg(short, long, action(ArgAction::Append))]
#[arg(long, action(ArgAction::Append))]
pub portal_client: Vec<String>,
#[command(subcommand)]
pub subcommand: Option<Command>,
Expand All @@ -63,11 +76,13 @@ impl Default for Args {
fn default() -> Self {
Self {
database_url: DEFAULT_DB_URL.to_string(),
provider_url: "".to_string(),
concurrency: 4,
latest_strategy_weight: 1,
failed_strategy_weight: 1,
oldest_strategy_weight: 1,
random_strategy_weight: 1,
four_fours_strategy_weight: 1,
strategy: None,
portal_client: vec!["ipc:////tmp/trin-jsonrpc.ipc".to_owned()],
subcommand: None,
Expand Down Expand Up @@ -162,6 +177,26 @@ mod test {
};
assert_eq!(result, expected);
}

/// Tests that the provider_url is passed through properly.
#[test]
fn test_provider_url() {
const PORTAL_CLIENT_STRING: &str = "ipc:////path/to/ipc";
const PROVIDER_URL: &str = "https://example.io/key";
let result = Args::parse_from([
"test",
"--provider-url",
PROVIDER_URL,
"--portal-client",
PORTAL_CLIENT_STRING,
]);
let expected = Args {
provider_url: PROVIDER_URL.to_string(),
portal_client: vec![PORTAL_CLIENT_STRING.to_owned()],
..Default::default()
};
assert_eq!(result, expected);
}
}

/// Used by a user to specify the intended form of transport
Expand Down
13 changes: 12 additions & 1 deletion glados-audit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub(crate) mod validation;
pub struct AuditConfig {
/// For Glados-related data.
pub database_url: String,
/// For getting on-the-fly block information.
pub provider_url: String,
/// Specific strategies to run.
pub strategies: Vec<SelectionStrategy>,
/// Weight for each strategy.
Expand Down Expand Up @@ -65,6 +67,7 @@ impl AuditConfig {
"Selected concurrency set."
)
}

let strategies = match args.strategy {
Some(s) => s,
None => {
Expand All @@ -73,6 +76,7 @@ impl AuditConfig {
SelectionStrategy::Random,
SelectionStrategy::Failed,
SelectionStrategy::SelectOldestUnaudited,
SelectionStrategy::FourFours,
]
}
};
Expand All @@ -83,11 +87,16 @@ impl AuditConfig {
SelectionStrategy::Random => args.random_strategy_weight,
SelectionStrategy::Failed => args.failed_strategy_weight,
SelectionStrategy::SelectOldestUnaudited => args.oldest_strategy_weight,
SelectionStrategy::FourFours => args.four_fours_strategy_weight,
SelectionStrategy::SpecificContentKey => 0,
};
weights.insert(strat.clone(), weight);
}

if args.provider_url.is_empty() && strategies.contains(&SelectionStrategy::FourFours) {
return Err(anyhow::anyhow!(
"No provider URL provided, required when `four_fours` strategy is enabled."
));
}
let mut portal_clients: Vec<PortalClient> = vec![];
for client_url in args.portal_client {
let client = PortalClient::from(client_url).await?;
Expand All @@ -96,6 +105,7 @@ impl AuditConfig {
}
Ok(AuditConfig {
database_url: args.database_url,
provider_url: args.provider_url,
strategies,
weights,
concurrency: args.concurrency,
Expand Down Expand Up @@ -160,6 +170,7 @@ pub async fn run_glados_audit(conn: DatabaseConnection, config: AuditConfig) {
strategy.clone(),
tx,
conn.clone(),
config.clone(),
));
}
// Collation of generated tasks, taken proportional to weights.
Expand Down
52 changes: 51 additions & 1 deletion glados-audit/src/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashSet;

use chrono::{DateTime, TimeZone, Utc};
use ethportal_api::HistoryContentKey;
use glados_core::db::store_block_keys;
use rand::{thread_rng, Rng};
use sea_orm::{
ColumnTrait, DatabaseConnection, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder,
Expand All @@ -17,17 +18,26 @@ use entity::{
content::{self, Model},
content_audit::{self, SelectionStrategy},
};
use web3::types::{BlockId, BlockNumber};

use crate::AuditTask;
use crate::{AuditConfig, AuditTask};

pub const MERGE_BLOCK_HEIGHT: i32 = 15537393;

pub async fn start_audit_selection_task(
strategy: SelectionStrategy,
tx: mpsc::Sender<AuditTask>,
conn: DatabaseConnection,
config: AuditConfig,
) {
match strategy {
SelectionStrategy::Latest => select_latest_content_for_audit(tx, conn).await,
SelectionStrategy::Random => select_random_content_for_audit(tx, conn).await,
SelectionStrategy::FourFours => {
// Fourfours strategy downloads its own keys rather than waiting on glados-monitor to put them in the DB.
let w3 = web3::Web3::new(web3::transports::Http::new(&config.provider_url).unwrap());
select_fourfours_content_for_audit(tx, conn, w3).await
}
SelectionStrategy::Failed => warn!("Need to implement SelectionStrategy::Failed"),
SelectionStrategy::SelectOldestUnaudited => {
select_oldest_unaudited_content_for_audit(tx, conn).await
Expand Down Expand Up @@ -95,6 +105,46 @@ async fn select_latest_content_for_audit(
}
}

/// Finds and sends audit tasks for [SelectionStrategy::FourFours].
///
/// 1. Get a random block number between 1 and MERGE_BLOCK_HEIGHT.
/// 2. Get the block hash for that block.
/// 3. Send content keys for header, body, receipts.
///
async fn select_fourfours_content_for_audit(
tx: mpsc::Sender<AuditTask>,
conn: DatabaseConnection,
w3: web3::Web3<web3::transports::Http>,
) -> ! {
let mut interval = interval(Duration::from_secs(5));

loop {
interval.tick().await;
let block_number = thread_rng().gen_range(1..MERGE_BLOCK_HEIGHT);
debug!(
strategy = "4444s",
"Getting hash for block number {block_number}."
);
let block_hash = w3
.eth()
.block(BlockId::Number(BlockNumber::Number(block_number.into())))
.await
.unwrap()
.unwrap()
.hash
.unwrap();

let items_to_audit =
store_block_keys(block_number, block_hash.as_fixed_bytes(), &conn).await;
debug!(
strategy = "4444s",
item_count = items_to_audit.len(),
"Adding content keys to the audit queue."
);
add_to_queue(tx.clone(), SelectionStrategy::FourFours, items_to_audit).await;
}
}

/// Adds Glados database History sub-protocol search results
/// to a channel for auditing against a Portal Node.
async fn add_to_queue(
Expand Down
Loading