Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into higher-throughput
Browse files Browse the repository at this point in the history
  • Loading branch information
jinmel committed Oct 3, 2024
2 parents da65b15 + 4174fd8 commit 2395502
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 108 deletions.
40 changes: 40 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Rust

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

env:
CARGO_TERM_COLOR: always

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- name: Build
run: cargo build --verbose --workspace

test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
with:
submodules: recursive
- name: Install Foundry
uses: foundry-rs/foundry-toolchain@v1
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
- name: Run tests
run: cargo test --verbose --workspace
90 changes: 35 additions & 55 deletions sqlite_db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use alloy::{
hex::{FromHex, ToHexExt},
primitives::{Address, TxHash},
};
use contender_core::db::{DbOps, RunTx};
use contender_core::db::{DbOps, NamedTx, RunTx};
use contender_core::{error::ContenderError, Result};
use r2d2::{Pool, PooledConnection};
use r2d2_sqlite::SqliteConnectionManager;
Expand Down Expand Up @@ -65,6 +65,18 @@ struct NamedTxRow {
contract_address: Option<String>,
}

impl From<NamedTxRow> for NamedTx {
fn from(row: NamedTxRow) -> Self {
let tx_hash = TxHash::from_hex(&row.tx_hash).expect("invalid tx hash");
let contract_address = row
.contract_address
.map(|a| Address::from_hex(&a))
.transpose()
.expect("invalid address");
NamedTx::new(row.name, tx_hash, contract_address)
}
}

impl NamedTxRow {
fn from_row(row: &Row) -> rusqlite::Result<Self> {
Ok(Self {
Expand Down Expand Up @@ -184,30 +196,14 @@ impl DbOps for SqliteDb {
Ok(res)
}

fn insert_named_tx(
&self,
name: String,
tx_hash: TxHash,
contract_address: Option<Address>,
) -> Result<()> {
self.execute(
"INSERT INTO named_txs (name, tx_hash, contract_address) VALUES (?, ?, ?)",
params![
name,
tx_hash.encode_hex(),
contract_address.map(|a| a.encode_hex())
],
)
}

fn insert_named_txs(&self, named_txs: Vec<(String, TxHash, Option<Address>)>) -> Result<()> {
fn insert_named_txs(&self, named_txs: Vec<NamedTx>) -> Result<()> {
let pool = self.get_pool()?;
let stmts = named_txs.iter().map(|(name, tx_hash, contract_address)| {
let stmts = named_txs.iter().map(|tx| {
format!(
"INSERT INTO named_txs (name, tx_hash, contract_address) VALUES ('{}', '{}', '{}');",
name,
tx_hash.encode_hex(),
contract_address.map(|a| a.encode_hex()).unwrap_or_default()
tx.name,
tx.tx_hash.encode_hex(),
tx.address.map(|a| a.encode_hex()).unwrap_or_default()
)
});
pool.execute_batch(&format!(
Expand All @@ -222,7 +218,7 @@ impl DbOps for SqliteDb {
Ok(())
}

fn get_named_tx(&self, name: &str) -> Result<(TxHash, Option<Address>)> {
fn get_named_tx(&self, name: &str) -> Result<NamedTx> {
let pool = self.get_pool()?;
let mut stmt = pool
.prepare(
Expand All @@ -238,15 +234,7 @@ impl DbOps for SqliteDb {
.transpose()
.map_err(|e| ContenderError::with_err(e, "no row found"))?
.ok_or(ContenderError::DbError("no existing row", None))?;

let tx_hash = TxHash::from_hex(&res.tx_hash)
.map_err(|e| ContenderError::with_err(e, "invalid tx hash"))?;
let contract_address = res
.contract_address
.map(|a| Address::from_hex(&a))
.transpose()
.map_err(|e| ContenderError::with_err(e, "invalid address"))?;
Ok((tx_hash, contract_address))
Ok(res.into())
}

fn insert_run_tx(&self, run_id: u64, run_tx: RunTx) -> Result<()> {
Expand Down Expand Up @@ -315,32 +303,16 @@ mod tests {
}

#[test]
fn inserts_named_tx() {
fn inserts_and_gets_named_txs() {
let db = SqliteDb::new_memory();
db.create_tables().unwrap();
let tx_hash = TxHash::from_slice(&[0u8; 32]);
let contract_address = Some(Address::from_slice(&[0u8; 20]));
db.insert_named_tx("test_tx".to_string(), tx_hash, contract_address)
.unwrap();
let count: i64 = db
.get_pool()
.unwrap()
.query_row("SELECT COUNT(*) FROM named_txs", params![], |row| {
row.get(0)
})
.unwrap();
assert_eq!(count, 1);
}

#[test]
fn insert_named_txs() {
let db = SqliteDb::new_memory();
db.create_tables().unwrap();
let tx_hash = TxHash::from_slice(&[0u8; 32]);
let contract_address = Some(Address::from_slice(&[0u8; 20]));
let contract_address = Some(Address::from_slice(&[4u8; 20]));
let name1 = "test_tx".to_string();
let name2 = "test_tx2";
db.insert_named_txs(vec![
("test_tx".to_string(), tx_hash, contract_address),
("test_tx2".to_string(), tx_hash, contract_address),
NamedTx::new(name1.to_owned(), tx_hash, contract_address),
NamedTx::new(name2.to_string(), tx_hash, contract_address),
])
.unwrap();
let count: i64 = db
Expand All @@ -351,10 +323,15 @@ mod tests {
})
.unwrap();
assert_eq!(count, 2);

let res1 = db.get_named_tx(&name1).unwrap();
assert_eq!(res1.name, name1);
assert_eq!(res1.tx_hash, tx_hash);
assert_eq!(res1.address, contract_address);
}

#[test]
fn inserts_run_txs() {
fn inserts_and_gets_run_txs() {
let db = SqliteDb::new_memory();
db.create_tables().unwrap();
let run_id = db.insert_run(100000, 100).unwrap();
Expand Down Expand Up @@ -383,5 +360,8 @@ mod tests {
.query_row("SELECT COUNT(*) FROM run_txs", params![], |row| row.get(0))
.unwrap();
assert_eq!(count, 2);

let res = db.get_run_txs(run_id as u64).unwrap();
assert_eq!(res.len(), 2);
}
}
55 changes: 28 additions & 27 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,29 @@ pub struct RunTx {
pub kind: Option<String>,
}

#[derive(Debug, Serialize, Clone)]
pub struct NamedTx {
pub name: String,
pub tx_hash: TxHash,
pub address: Option<Address>,
}

impl NamedTx {
pub fn new(name: String, tx_hash: TxHash, address: Option<Address>) -> Self {
Self {
name,
tx_hash,
address,
}
}
}

impl From<NamedTx> for Vec<NamedTx> {
fn from(named_tx: NamedTx) -> Self {
vec![named_tx]
}
}

pub trait DbOps {
fn create_tables(&self) -> Result<()>;

Expand All @@ -23,18 +46,9 @@ pub trait DbOps {

fn num_runs(&self) -> Result<u64>;

fn insert_named_tx(
&self,
name: String,
tx_hash: TxHash,
contract_address: Option<Address>,
) -> Result<()>;

fn insert_named_txs(&self, named_txs: Vec<(String, TxHash, Option<Address>)>) -> Result<()>;

fn get_named_tx(&self, name: &str) -> Result<(TxHash, Option<Address>)>;
fn insert_named_txs(&self, named_txs: Vec<NamedTx>) -> Result<()>;

fn insert_run_tx(&self, run_id: u64, tx: RunTx) -> Result<()>;
fn get_named_tx(&self, name: &str) -> Result<NamedTx>;

fn insert_run_txs(&self, run_id: u64, run_txs: Vec<RunTx>) -> Result<()>;

Expand All @@ -56,25 +70,12 @@ impl DbOps for MockDb {
Ok(0)
}

fn insert_named_tx(
&self,
_name: String,
_tx_hash: TxHash,
_contract_address: Option<Address>,
) -> Result<()> {
fn insert_named_txs(&self, _named_txs: Vec<NamedTx>) -> Result<()> {
Ok(())
}

fn insert_named_txs(&self, _named_txs: Vec<(String, TxHash, Option<Address>)>) -> Result<()> {
Ok(())
}

fn get_named_tx(&self, _name: &str) -> Result<(TxHash, Option<Address>)> {
Ok((TxHash::default(), None))
}

fn insert_run_tx(&self, _run_id: u64, _tx: RunTx) -> Result<()> {
Ok(())
fn get_named_tx(&self, _name: &str) -> Result<NamedTx> {
Ok(NamedTx::new(String::default(), TxHash::default(), None))
}

fn insert_run_txs(&self, _run_id: u64, _run_txs: Vec<RunTx>) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion src/generator/templater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ where
placeholder_map.insert(
template_key,
template_value
.1
.address
.map(|a| self.encode_contract_address(&a))
.unwrap_or_default(),
);
Expand Down
13 changes: 4 additions & 9 deletions src/spammer/blockwise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
.map(|slice| slice.to_vec())
.collect();
let mut block_offset = 0;
let mut last_block_number;
let mut last_block_number = 0;

// get chain id before we start spamming
let chain_id = self
Expand All @@ -93,11 +93,7 @@ where

let mut tasks = vec![];
let mut gas_limits = HashMap::<FixedBytes<4>, u128>::new();
let first_block_number = self
.rpc_client
.get_block_number()
.await
.map_err(|e| ContenderError::with_err(e, "failed to get block number"))?;

// get nonce for each signer and put it into a hashmap
let mut nonces = HashMap::new();
for (addr, _) in self.scenario.wallet_map.iter() {
Expand All @@ -119,7 +115,7 @@ where
.await
.map_err(|e| ContenderError::with_err(e, "failed to get block"))?
.ok_or(ContenderError::SpamError("no block found", None))?;
last_block_number = block.header.number + 1;
last_block_number = block.header.number;

// get gas price
let gas_price = self
Expand Down Expand Up @@ -248,8 +244,7 @@ where
let _ = task.await;
}

// re-iterate through target block range in case there are any txs left in the cache
last_block_number = first_block_number;
// wait until there are no txs left in the cache, or until we time out
let mut timeout_counter = 0;
if let Some(run_id) = run_id {
loop {
Expand Down
23 changes: 14 additions & 9 deletions src/spammer/tx_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,6 @@ where
target_block_num,
} => {
println!("unconfirmed txs: {}", self.cache.len());
let receipts = self
.rpc
.get_block_receipts(target_block_num.into())
.await?
.unwrap_or_default();
println!("found {} receipts", receipts.len());
let mut maybe_block;
loop {
maybe_block = self
Expand All @@ -109,15 +103,26 @@ where
if maybe_block.is_some() {
break;
}
println!("waiting for block {}", target_block_num);
std::thread::sleep(Duration::from_secs(1));
}
let target_block = maybe_block.expect("this should never happen");
let receipts = self
.rpc
.get_block_receipts(target_block_num.into())
.await?
.unwrap_or_default();
println!(
"found {} receipts for block #{}",
receipts.len(),
target_block_num
);
// filter for txs that were included in the block
let receipt_tx_hashes = receipts
.iter()
.map(|r| r.transaction_hash)
.collect::<Vec<_>>();
let pending_txs = self
let confirmed_txs = self
.cache
.iter()
.filter(|tx| receipt_tx_hashes.contains(&tx.tx_hash))
Expand All @@ -128,13 +133,13 @@ where
let new_txs = &self
.cache
.iter()
.filter(|tx| !pending_txs.contains(tx))
.filter(|tx| !confirmed_txs.contains(tx))
.map(|tx| tx.to_owned())
.collect::<Vec<_>>();
self.cache = new_txs.to_vec();

// ready to go to the DB
let run_txs = pending_txs
let run_txs = confirmed_txs
.into_iter()
.map(|pending_tx| {
let receipt = receipts
Expand Down
Loading

0 comments on commit 2395502

Please sign in to comment.