Skip to content

Commit

Permalink
feat(pool): add basic structure for update entities pool operation
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-miao committed Oct 31, 2023
1 parent ed31bd8 commit 282087d
Show file tree
Hide file tree
Showing 12 changed files with 334 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ repository = "https://github.com/alchemyplatform/rundler"
[workspace.dependencies]
anyhow = "1.0.70"
async-trait = "0.1.73"
cargo-husky = { version = "1", default-features = false, features = ["user-hooks" ] }
cargo-husky = { version = "1", default-features = false, features = ["user-hooks"] }
ethers = "2.0.8"
futures = "0.3.28"
futures-util = "0.3.28"
Expand Down
31 changes: 31 additions & 0 deletions crates/pool/proto/op_pool/op_pool.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ message Entity {
bytes address = 2;
}

// The type of update to perform on an entity
enum EntityUpdateType {
ENTITY_UPDATE_TYPE_UNSPECIFIED = 0;
ENTITY_UPDATE_TYPE_UNSTAKED_INVALIDATION = 1;
ENTITY_UPDATE_TYPE_STAKED_INVALIDATION = 2;
}

// A tuple consisting of an entity and what kind of update to perform on it
message EntityUpdate {
Entity entity = 1;
EntityUpdateType update_type = 2;
}

// Defines a UserOperation persisted in a local mempool
message MempoolOp {
UserOperation uo = 1;
Expand Down Expand Up @@ -119,6 +132,9 @@ service OpPool {
// from the mempool
rpc RemoveEntities(RemoveEntitiesRequest) returns (RemoveEntitiesResponse);

// Handles a list of updates to be performed on entities
rpc UpdateEntities(UpdateEntitiesRequest) returns (UpdateEntitiesResponse);

// Clears the bundler mempool and reputation data of paymasters/accounts/factories/aggregators
rpc DebugClearState (DebugClearStateRequest) returns (DebugClearStateResponse);
// Dumps the current UserOperations mempool
Expand Down Expand Up @@ -208,6 +224,21 @@ message RemoveEntitiesResponse {
}
message RemoveEntitiesSuccess {}

message UpdateEntitiesRequest {
// The serilaized entry point address
bytes entry_point = 1;

// A list of updates that should be performed on the entities
repeated EntityUpdate entity_updates = 2;
}
message UpdateEntitiesResponse {
oneof result {
UpdateEntitiesSuccess success = 1;
MempoolError failure = 2;
}
}
message UpdateEntitiesSuccess {}

message DebugClearStateRequest {}
message DebugClearStateResponse {
oneof result {
Expand Down
5 changes: 4 additions & 1 deletion crates/pool/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use ethers::types::{Address, H256};
#[cfg(test)]
use mockall::automock;
use rundler_sim::{MempoolConfig, PrecheckSettings, SimulationSettings};
use rundler_types::{Entity, EntityType, UserOperation, ValidTimeRange};
use rundler_types::{Entity, EntityType, EntityUpdate, UserOperation, ValidTimeRange};
use strum::IntoEnumIterator;
use tonic::async_trait;
pub(crate) use uo_pool::UoPool;
Expand Down Expand Up @@ -63,6 +63,9 @@ pub trait Mempool: Send + Sync + 'static {
/// Removes all operations associated with a given entity from the pool.
fn remove_entity(&self, entity: Entity);

/// Updates the reputation of an entity.
fn update_entity(&self, entity_update: EntityUpdate);

/// Returns the best operations from the pool.
///
/// Returns the best operations from the pool based on their gas bids up to
Expand Down
58 changes: 58 additions & 0 deletions crates/pool/src/mempool/reputation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ pub(crate) trait ReputationManager: Send + Sync + 'static {
/// pool
fn add_seen(&self, address: Address);

/// Called by mempool when an unstaked entity causes the invalidation of a bundle
fn handle_urep_030_penalty(&self, address: Address);

/// Called by mempool when a staked entity causes the invalidation of a bundle
fn handle_srep_050_penalty(&self, address: Address);

/// Called by the mempool when an operation that requires stake is removed
/// from the pool
fn add_included(&self, address: Address);
Expand All @@ -101,6 +107,9 @@ pub(crate) trait ReputationManager: Send + Sync + 'static {

/// Called by debug API
fn set_reputation(&self, address: Address, ops_seen: u64, ops_included: u64);

/// Get the ops allowed for an unstaked entity
fn get_ops_allowed(&self, address: Address) -> u64;
}

#[derive(Debug)]
Expand Down Expand Up @@ -142,6 +151,14 @@ impl ReputationManager for HourlyMovingAverageReputation {
self.reputation.write().add_seen(address);
}

fn handle_urep_030_penalty(&self, address: Address) {
self.reputation.write().handle_urep_030_penalty(address);
}

fn handle_srep_050_penalty(&self, address: Address) {
self.reputation.write().handle_srep_050_penalty(address);
}

fn add_included(&self, address: Address) {
self.reputation.write().add_included(address);
}
Expand Down Expand Up @@ -169,19 +186,31 @@ impl ReputationManager for HourlyMovingAverageReputation {
.write()
.set_reputation(address, ops_seen, ops_included)
}

fn get_ops_allowed(&self, address: Address) -> u64 {
self.reputation.read().get_ops_allowed(address)
}
}

#[derive(Debug, Clone, Copy)]
pub(crate) struct ReputationParams {
bundle_invalidation_ops_seen_staked_penalty: u64,
bundle_invalidation_ops_seen_unstaked_penalty: u64,
same_unstaked_entity_mempool_count: u64,
min_inclusion_rate_denominator: u64,
inclusion_rate_factor: u64,
throttling_slack: u64,
ban_slack: u64,
}

impl ReputationParams {
pub(crate) fn bundler_default() -> Self {
Self {
bundle_invalidation_ops_seen_staked_penalty: 10_000,
bundle_invalidation_ops_seen_unstaked_penalty: 1_000,
same_unstaked_entity_mempool_count: 10,
min_inclusion_rate_denominator: 10,
inclusion_rate_factor: 10,
throttling_slack: 10,
ban_slack: 50,
}
Expand All @@ -190,7 +219,11 @@ impl ReputationParams {
#[allow(dead_code)]
pub(crate) fn client_default() -> Self {
Self {
bundle_invalidation_ops_seen_staked_penalty: 10_000,
bundle_invalidation_ops_seen_unstaked_penalty: 1_000,
same_unstaked_entity_mempool_count: 10,
min_inclusion_rate_denominator: 100,
inclusion_rate_factor: 10,
throttling_slack: 10,
ban_slack: 50,
}
Expand Down Expand Up @@ -252,6 +285,17 @@ impl AddressReputation {
count.ops_seen += 1;
}

fn handle_urep_030_penalty(&mut self, address: Address) {
let count = self.counts.entry(address).or_default();
count.ops_seen += self.params.bundle_invalidation_ops_seen_unstaked_penalty;
}

fn handle_srep_050_penalty(&mut self, address: Address) {
let count = self.counts.entry(address).or_default();
// According to the spec we set ops_seen here instead of incrementing it
count.ops_seen = self.params.bundle_invalidation_ops_seen_staked_penalty;
}

fn add_included(&mut self, address: Address) {
let count = self.counts.entry(address).or_default();
count.ops_included += 1;
Expand All @@ -268,6 +312,20 @@ impl AddressReputation {
count.ops_included = ops_included;
}

fn get_ops_allowed(&self, address: Address) -> u64 {
let count = self.counts.get(&address).unwrap();
let inclusion_based_count = if count.ops_seen == 0 {
// make sure we aren't dividing by 0
0
} else {
count.ops_included * self.params.inclusion_rate_factor / count.ops_seen
+ std::cmp::min(count.ops_included, 10_000)
};

// return ops allowed, as defined by UREP-020
self.params.same_unstaked_entity_mempool_count + inclusion_based_count
}

fn hourly_update(&mut self) {
for count in self.counts.values_mut() {
count.ops_seen -= count.ops_seen / 24;
Expand Down
43 changes: 42 additions & 1 deletion crates/pool/src/mempool/uo_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use ethers::types::{Address, H256, U256};
use itertools::Itertools;
use parking_lot::RwLock;
use rundler_sim::{Prechecker, Simulator};
use rundler_types::{Entity, UserOperation};
use rundler_types::{Entity, EntityUpdate, EntityUpdateType, UserOperation};
use rundler_utils::emit::WithEntryPoint;
use tokio::sync::broadcast;
use tonic::async_trait;
Expand Down Expand Up @@ -328,6 +328,18 @@ where
UoPoolMetrics::increment_removed_entities(self.entry_point);
}

fn update_entity(&self, update: EntityUpdate) {
let entity = update.entity;
match update.update_type {
EntityUpdateType::UnstakedInvalidation => {
self.reputation.handle_urep_030_penalty(entity.address);
}
EntityUpdateType::StakedInvalidation => {
self.reputation.handle_srep_050_penalty(entity.address);
}
}
}

fn best_operations(
&self,
max: usize,
Expand Down Expand Up @@ -897,6 +909,10 @@ mod tests {

#[derive(Default, Clone)]
struct MockReputationManager {
bundle_invalidation_ops_seen_staked_penalty: u64,
bundle_invalidation_ops_seen_unstaked_penalty: u64,
same_unstaked_entity_mempool_count: u64,
inclusion_rate_factor: u64,
throttling_slack: u64,
ban_slack: u64,
counts: Arc<RwLock<Counts>>,
Expand Down Expand Up @@ -938,6 +954,16 @@ mod tests {
*self.counts.write().seen.entry(address).or_default() += 1;
}

fn handle_srep_050_penalty(&self, address: Address) {
*self.counts.write().seen.entry(address).or_default() =
self.bundle_invalidation_ops_seen_staked_penalty;
}

fn handle_urep_030_penalty(&self, address: Address) {
*self.counts.write().seen.entry(address).or_default() +=
self.bundle_invalidation_ops_seen_unstaked_penalty;
}

fn add_included(&self, address: Address) {
*self.counts.write().included.entry(address).or_default() += 1;
}
Expand Down Expand Up @@ -967,5 +993,20 @@ mod tests {
counts.seen.insert(address, ops_seen);
counts.included.insert(address, ops_included);
}

fn get_ops_allowed(&self, address: Address) -> u64 {
let counts = self.counts.read();
let seen = *counts.seen.get(&address).unwrap();
let included = *counts.included.get(&address).unwrap();
let inclusion_based_count = if seen == 0 {
// make sure we aren't dividing by 0
0
} else {
included * self.inclusion_rate_factor / seen + std::cmp::min(included, 10_000)
};

// return ops allowed, as defined by UREP-020
self.same_unstaked_entity_mempool_count + inclusion_based_count
}
}
}
41 changes: 40 additions & 1 deletion crates/pool/src/server/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use async_trait::async_trait;
use ethers::types::{Address, H256};
use futures_util::Stream;
use rundler_task::server::{HealthCheck, ServerStatus};
use rundler_types::{Entity, UserOperation};
use rundler_types::{Entity, EntityUpdate, UserOperation};
use tokio::{
sync::{broadcast, mpsc, oneshot},
task::JoinHandle,
Expand Down Expand Up @@ -170,6 +170,22 @@ impl PoolServer for LocalPoolHandle {
}
}

async fn update_entities(
&self,
entry_point: Address,
entity_updates: Vec<EntityUpdate>,
) -> PoolResult<()> {
let req = ServerRequestKind::UpdateEntities {
entry_point,
entity_updates,
};
let resp = self.send(req).await?;
match resp {
ServerResponse::UpdateEntities => Ok(()),
_ => Err(PoolServerError::UnexpectedResponse),
}
}

async fn debug_clear_state(&self) -> Result<(), PoolServerError> {
let req = ServerRequestKind::DebugClearState;
let resp = self.send(req).await?;
Expand Down Expand Up @@ -307,6 +323,18 @@ where
Ok(())
}

fn update_entities<'a>(
&self,
entry_point: Address,
entity_updates: impl IntoIterator<Item = &'a EntityUpdate>,
) -> PoolResult<()> {
let mempool = self.get_pool(entry_point)?;
for update in entity_updates {
mempool.update_entity(*update);
}
Ok(())
}

fn debug_clear_state(&self) -> PoolResult<()> {
for mempool in self.mempools.values() {
mempool.clear();
Expand Down Expand Up @@ -407,6 +435,12 @@ where
Err(e) => Err(e),
}
},
ServerRequestKind::UpdateEntities { entry_point, entity_updates } => {
match self.update_entities(entry_point, &entity_updates) {
Ok(_) => Ok(ServerResponse::UpdateEntities),
Err(e) => Err(e),
}
},
ServerRequestKind::DebugClearState => {
match self.debug_clear_state() {
Ok(_) => Ok(ServerResponse::DebugClearState),
Expand Down Expand Up @@ -473,6 +507,10 @@ enum ServerRequestKind {
entry_point: Address,
entities: Vec<Entity>,
},
UpdateEntities {
entry_point: Address,
entity_updates: Vec<EntityUpdate>,
},
DebugClearState,
DebugDumpMempool {
entry_point: Address,
Expand Down Expand Up @@ -500,6 +538,7 @@ enum ServerResponse {
},
RemoveOps,
RemoveEntities,
UpdateEntities,
DebugClearState,
DebugDumpMempool {
ops: Vec<PoolOperation>,
Expand Down
9 changes: 8 additions & 1 deletion crates/pool/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use local::{LocalPoolBuilder, LocalPoolHandle};
use mockall::automock;
pub(crate) use remote::spawn_remote_mempool_server;
pub use remote::RemotePoolClient;
use rundler_types::{Entity, UserOperation};
use rundler_types::{Entity, EntityUpdate, UserOperation};

use crate::mempool::{PoolOperation, Reputation};

Expand Down Expand Up @@ -72,6 +72,13 @@ pub trait PoolServer: Send + Sync + 'static {
/// Remove operations associated with entities from the pool
async fn remove_entities(&self, entry_point: Address, entities: Vec<Entity>) -> PoolResult<()>;

/// Update operations associated with entities from the pool
async fn update_entities(
&self,
entry_point: Address,
entities: Vec<EntityUpdate>,
) -> PoolResult<()>;

/// Subscribe to new chain heads from the pool.
///
/// The pool will notify the subscriber when a new chain head is received, and the pool
Expand Down
Loading

0 comments on commit 282087d

Please sign in to comment.