Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pool): add time to mine tracking and metrics #729

Merged
merged 1 commit into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 118 additions & 10 deletions crates/pool/src/mempool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
// If not, see https://www.gnu.org/licenses/.

use std::{
cmp::Ordering,
cmp::{self, Ordering},
collections::{hash_map::Entry, BTreeSet, HashMap, HashSet},
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};

use anyhow::Context;
Expand All @@ -24,7 +25,7 @@ use ethers::{
};
use rundler_types::{
pool::{MempoolError, PoolOperation},
Entity, EntityType, Timestamp, UserOperation, UserOperationId, UserOperationVariant,
Entity, EntityType, GasFees, Timestamp, UserOperation, UserOperationId, UserOperationVariant,
};
use rundler_utils::math;
use tracing::info;
Expand Down Expand Up @@ -81,6 +82,10 @@ pub(crate) struct PoolInner {
pool_size: SizeTracker,
/// keeps track of the size of the removed cache in bytes
cache_size: SizeTracker,
/// The time of the previous block
prev_sys_block_time: Duration,
/// The number of the previous block
prev_block_number: u64,
}

impl PoolInner {
Expand All @@ -96,6 +101,8 @@ impl PoolInner {
submission_id: 0,
pool_size: SizeTracker::default(),
cache_size: SizeTracker::default(),
prev_sys_block_time: Duration::default(),
prev_block_number: 0,
}
}

Expand Down Expand Up @@ -145,22 +152,54 @@ impl PoolInner {
self.best.clone().into_iter().map(|v| v.po)
}

/// Removes all operations using the given entity, returning the hashes of the removed operations.
/// Does maintenance on the pool.
///
/// 1) Removes all operations using the given entity, returning the hashes of the removed operations.
/// 2) Updates time to mine stats for all operations in the pool.
///
/// NOTE: This method is O(n) where n is the number of operations in the pool.
/// It should be called sparingly (e.g. when a block is mined).
pub(crate) fn remove_expired(&mut self, expire_before: Timestamp) -> Vec<(H256, Timestamp)> {
pub(crate) fn do_maintenance(
&mut self,
block_number: u64,
block_timestamp: Timestamp,
candidate_gas_fees: GasFees,
base_fee: U256,
) -> Vec<(H256, Timestamp)> {
let sys_block_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time should be after epoch");

let block_delta_time = sys_block_time - self.prev_sys_block_time;
let block_delta_height = block_number - self.prev_block_number;
dancoombs marked this conversation as resolved.
Show resolved Hide resolved
let mut expired = Vec::new();
for (hash, op) in &self.by_hash {
if op.po.valid_time_range.valid_until < expire_before {
let mut num_candidates = 0;

for (hash, op) in &mut self.by_hash {
if op.po.valid_time_range.valid_until < block_timestamp {
expired.push((*hash, op.po.valid_time_range.valid_until));
}

num_candidates += if op.update_time_to_mine(
block_delta_time,
block_delta_height,
candidate_gas_fees,
base_fee,
) {
1
} else {
0
};
}

for (hash, _) in &expired {
self.remove_operation_by_hash(*hash);
}

PoolMetrics::set_num_candidates(num_candidates, self.config.entry_point);
self.prev_block_number = block_number;
self.prev_sys_block_time = sys_block_time;

expired
}

Expand Down Expand Up @@ -243,6 +282,7 @@ impl PoolInner {
block_number: u64,
) -> Option<Arc<PoolOperation>> {
let tx_in_pool = self.by_id.get(&mined_op.id())?;
PoolMetrics::record_time_to_mine(&tx_in_pool.time_to_mine, mined_op.entry_point);

let hash = tx_in_pool
.uo()
Expand Down Expand Up @@ -380,6 +420,7 @@ impl PoolInner {
let pool_op = OrderedPoolOperation {
po: op,
submission_id: submission_id.unwrap_or_else(|| self.next_submission_id()),
time_to_mine: TimeToMineInfo::new(),
};

// update counts
Expand Down Expand Up @@ -484,6 +525,7 @@ impl PoolInner {
struct OrderedPoolOperation {
po: Arc<PoolOperation>,
submission_id: u64,
time_to_mine: TimeToMineInfo,
}

impl OrderedPoolOperation {
Expand All @@ -494,6 +536,28 @@ impl OrderedPoolOperation {
fn mem_size(&self) -> usize {
std::mem::size_of::<Self>() + self.po.mem_size()
}

fn update_time_to_mine(
&mut self,
block_delta_time: Duration,
block_delta_height: u64,
candidate_gas_fees: GasFees,
base_fee: U256,
) -> bool {
let candidate_gas_price = base_fee + candidate_gas_fees.max_priority_fee_per_gas;
let uo_gas_price = cmp::min(
self.uo().max_fee_per_gas(),
self.uo().max_priority_fee_per_gas() + base_fee,
);

if uo_gas_price >= candidate_gas_price {
self.time_to_mine
.increase(block_delta_time, block_delta_height);
true
} else {
false
}
}
}

impl Eq for OrderedPoolOperation {}
Expand Down Expand Up @@ -521,6 +585,26 @@ impl PartialEq for OrderedPoolOperation {
}
}

#[derive(Debug, Clone)]
struct TimeToMineInfo {
candidate_for_blocks: u64,
candidate_for_time: Duration,
}

impl TimeToMineInfo {
fn new() -> Self {
Self {
candidate_for_blocks: 0,
candidate_for_time: Duration::default(),
}
}

fn increase(&mut self, block_delta_time: Duration, block_delta_height: u64) {
self.candidate_for_blocks += block_delta_height;
self.candidate_for_time += block_delta_time;
}
}

struct PoolMetrics {}

impl PoolMetrics {
Expand All @@ -530,12 +614,32 @@ impl PoolMetrics {
metrics::gauge!("op_pool_size_bytes", "entry_point" => entry_point.to_string())
.set(size_bytes as f64);
}

fn set_cache_metrics(num_ops: usize, size_bytes: isize, entry_point: Address) {
metrics::gauge!("op_pool_num_ops_in_cache", "entry_point" => entry_point.to_string())
.set(num_ops as f64);
metrics::gauge!("op_pool_cache_size_bytes", "entry_point" => entry_point.to_string())
.set(size_bytes as f64);
}

// Set the number of candidates in the pool, only changes on block boundaries
fn set_num_candidates(num_candidates: usize, entry_point: Address) {
metrics::gauge!("op_pool_num_candidates", "entry_point" => entry_point.to_string())
.set(num_candidates as f64);
}

fn record_time_to_mine(time_to_mine: &TimeToMineInfo, entry_point: Address) {
metrics::histogram!(
"op_pool_time_to_mine",
"entry_point" => entry_point.to_string()
)
.record(time_to_mine.candidate_for_time.as_millis() as f64);
metrics::histogram!(
"op_pool_blocks_to_mine",
"entry_point" => entry_point.to_string()
)
.record(time_to_mine.candidate_for_blocks as f64);
}
}

#[cfg(test)]
Expand Down Expand Up @@ -907,7 +1011,8 @@ mod tests {
pool.pool_size,
OrderedPoolOperation {
po: Arc::new(po1),
submission_id: 0
submission_id: 0,
time_to_mine: TimeToMineInfo::new()
}
.mem_size()
);
Expand Down Expand Up @@ -947,7 +1052,8 @@ mod tests {
pool.pool_size,
OrderedPoolOperation {
po: Arc::new(po2),
submission_id: 0
submission_id: 0,
time_to_mine: TimeToMineInfo::new(),
}
.mem_size()
);
Expand Down Expand Up @@ -979,7 +1085,7 @@ mod tests {
po1.valid_time_range.valid_until = Timestamp::from(1);
let _ = pool.add_operation(po1.clone()).unwrap();

let res = pool.remove_expired(Timestamp::from(2));
let res = pool.do_maintenance(0, Timestamp::from(2), GasFees::default(), 0.into());
assert_eq!(res.len(), 1);
assert_eq!(res[0].0, po1.uo.hash(conf.entry_point, conf.chain_id));
assert_eq!(res[0].1, Timestamp::from(1));
Expand All @@ -1001,7 +1107,8 @@ mod tests {
po3.valid_time_range.valid_until = 9.into();
let _ = pool.add_operation(po3.clone()).unwrap();

let res = pool.remove_expired(10.into());
let res = pool.do_maintenance(0, Timestamp::from(10), GasFees::default(), 0.into());

assert_eq!(res.len(), 2);
assert!(res.contains(&(po1.uo.hash(conf.entry_point, conf.chain_id), 5.into())));
assert!(res.contains(&(po3.uo.hash(conf.entry_point, conf.chain_id), 9.into())));
Expand All @@ -1022,6 +1129,7 @@ mod tests {
OrderedPoolOperation {
po: Arc::new(create_op(Address::random(), 1, 1)),
submission_id: 1,
time_to_mine: TimeToMineInfo::new(),
}
.mem_size()
}
Expand Down
77 changes: 52 additions & 25 deletions crates/pool/src/mempool/uo_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use rundler_types::{
pool::{
MempoolError, PaymasterMetadata, PoolOperation, Reputation, ReputationStatus, StakeStatus,
},
Entity, EntityUpdate, EntityUpdateType, EntryPointVersion, UserOperation, UserOperationId,
UserOperationVariant,
Entity, EntityUpdate, EntityUpdateType, EntryPointVersion, GasFees, UserOperation,
UserOperationId, UserOperationVariant,
};
use rundler_utils::emit::WithEntryPoint;
use tokio::sync::broadcast;
Expand Down Expand Up @@ -62,6 +62,8 @@ struct UoPoolState {
pool: PoolInner,
throttled_ops: HashSet<H256>,
block_number: u64,
gas_fees: GasFees,
base_fee: U256,
}

impl<UO, P, S, E> UoPool<UO, P, S, E>
Expand All @@ -84,6 +86,8 @@ where
pool: PoolInner::new(config.clone().into()),
throttled_ops: HashSet::new(),
block_number: 0,
gas_fees: GasFees::default(),
base_fee: U256::zero(),
}),
reputation,
paymaster,
Expand Down Expand Up @@ -297,38 +301,61 @@ where
})
}

// expire old UOs
let expired = state.pool.remove_expired(update.latest_block_timestamp);
// pool maintenance
let gas_fees = state.gas_fees;
let base_fee = state.base_fee;
let expired = state.pool.do_maintenance(
update.latest_block_number,
update.latest_block_timestamp,
gas_fees,
base_fee,
);

for (hash, until) in expired {
self.emit(OpPoolEvent::RemovedOp {
op_hash: hash,
reason: OpRemovalReason::Expired { valid_until: until },
})
}

state.block_number = update.latest_block_number;
}

// update required bundle fees and update metrics
if let Ok((bundle_fees, base_fee)) = self.prechecker.update_fees().await {
let max_fee = match format_units(bundle_fees.max_fee_per_gas, "gwei") {
Ok(s) => s.parse::<f64>().unwrap_or_default(),
Err(_) => 0.0,
};
UoPoolMetrics::current_max_fee_gwei(max_fee);

let max_priority_fee = match format_units(bundle_fees.max_priority_fee_per_gas, "gwei")
{
Ok(s) => s.parse::<f64>().unwrap_or_default(),
Err(_) => 0.0,
};
UoPoolMetrics::current_max_priority_fee_gwei(max_priority_fee);

let base_fee = match format_units(base_fee, "gwei") {
Ok(s) => s.parse::<f64>().unwrap_or_default(),
Err(_) => 0.0,
};
UoPoolMetrics::current_base_fee(base_fee);
match self.prechecker.update_fees().await {
Ok((bundle_fees, base_fee)) => {
let max_fee = match format_units(bundle_fees.max_fee_per_gas, "gwei") {
Ok(s) => s.parse::<f64>().unwrap_or_default(),
Err(_) => 0.0,
};
UoPoolMetrics::current_max_fee_gwei(max_fee);

let max_priority_fee =
match format_units(bundle_fees.max_priority_fee_per_gas, "gwei") {
Ok(s) => s.parse::<f64>().unwrap_or_default(),
Err(_) => 0.0,
};
UoPoolMetrics::current_max_priority_fee_gwei(max_priority_fee);

let base_fee_f64 = match format_units(base_fee, "gwei") {
Ok(s) => s.parse::<f64>().unwrap_or_default(),
Err(_) => 0.0,
};
UoPoolMetrics::current_base_fee(base_fee_f64);

// cache for the next update
{
let mut state = self.state.write();
state.block_number = update.latest_block_number;
state.gas_fees = bundle_fees;
state.base_fee = base_fee;
}
}
Err(e) => {
tracing::error!("Failed to update fees: {:?}", e);
{
let mut state = self.state.write();
state.block_number = update.latest_block_number;
}
}
}
}

Expand Down
Loading