Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add drop local user operation endpoint #610

Merged
merged 1 commit into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 17 additions & 2 deletions bin/rundler/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use builder::BuilderCliArgs;
use node::NodeCliArgs;
use pool::PoolCliArgs;
use rpc::RpcCliArgs;
use rundler_rpc::EthApiSettings;
use rundler_rpc::{EthApiSettings, RundlerApiSettings};
use rundler_sim::{
EstimationSettings, PrecheckSettings, PriorityFeeMode, SimulationSettings, MIN_CALL_GAS_LIMIT,
};
Expand Down Expand Up @@ -298,7 +298,7 @@ impl TryFrom<&CommonArgs> for EstimationSettings {
impl TryFrom<&CommonArgs> for PrecheckSettings {
type Error = anyhow::Error;

fn try_from(value: &CommonArgs) -> anyhow::Result<Self> {
fn try_from(value: &CommonArgs) -> Result<Self, Self::Error> {
Ok(Self {
max_verification_gas: value.max_verification_gas.into(),
max_total_execution_gas: value.max_bundle_gas.into(),
Expand Down Expand Up @@ -330,6 +330,21 @@ impl From<&CommonArgs> for EthApiSettings {
}
}

impl TryFrom<&CommonArgs> for RundlerApiSettings {
type Error = anyhow::Error;

fn try_from(value: &CommonArgs) -> Result<Self, Self::Error> {
Ok(Self {
priority_fee_mode: PriorityFeeMode::try_from(
value.priority_fee_mode_kind.as_str(),
value.priority_fee_mode_value,
)?,
bundle_priority_fee_overhead_percent: value.bundle_priority_fee_overhead_percent,
max_verification_gas: value.max_verification_gas,
})
}
}

/// CLI options for the metrics server
#[derive(Debug, Args)]
#[command(next_help_heading = "Metrics")]
Expand Down
1 change: 1 addition & 0 deletions bin/rundler/src/cli/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub async fn run(
(&common_args).try_into()?,
(&common_args).into(),
(&common_args).try_into()?,
(&common_args).try_into()?,
)?;

let (event_sender, event_rx) =
Expand Down
9 changes: 9 additions & 0 deletions bin/rundler/src/cli/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ pub struct PoolArgs {
default_value = "true"
)]
pub reputation_tracking_enabled: bool,

#[arg(
long = "pool.drop_min_num_blocks",
name = "pool.drop_min_num_blocks",
env = "POOL_DROP_MIN_NUM_BLOCKS",
default_value = "10"
)]
pub drop_min_num_blocks: u64,
}

impl PoolArgs {
Expand Down Expand Up @@ -182,6 +190,7 @@ impl PoolArgs {
throttled_entity_live_blocks: self.throttled_entity_live_blocks,
paymaster_tracking_enabled: self.paymaster_tracking_enabled,
reputation_tracking_enabled: self.reputation_tracking_enabled,
drop_min_num_blocks: self.drop_min_num_blocks,
};

Ok(PoolTaskArgs {
Expand Down
5 changes: 4 additions & 1 deletion bin/rundler/src/cli/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use anyhow::Context;
use clap::Args;
use rundler_builder::RemoteBuilderClient;
use rundler_pool::RemotePoolClient;
use rundler_rpc::{EthApiSettings, RpcTask, RpcTaskArgs};
use rundler_rpc::{EthApiSettings, RpcTask, RpcTaskArgs, RundlerApiSettings};
use rundler_sim::{EstimationSettings, PrecheckSettings};
use rundler_task::{server::connect_with_retries_shutdown, spawn_tasks_with_shutdown};
use rundler_types::chain::ChainSpec;
Expand Down Expand Up @@ -86,6 +86,7 @@ impl RpcArgs {
common: &CommonArgs,
precheck_settings: PrecheckSettings,
eth_api_settings: EthApiSettings,
rundler_api_settings: RundlerApiSettings,
estimation_settings: EstimationSettings,
) -> anyhow::Result<RpcTaskArgs> {
let apis = self
Expand All @@ -105,6 +106,7 @@ impl RpcArgs {
api_namespaces: apis,
precheck_settings,
eth_api_settings,
rundler_api_settings,
estimation_settings,
rpc_timeout: Duration::from_secs(self.timeout_seconds.parse()?),
max_connections: self.max_connections,
Expand Down Expand Up @@ -154,6 +156,7 @@ pub async fn run(
(&common_args).try_into()?,
(&common_args).into(),
(&common_args).try_into()?,
(&common_args).try_into()?,
)?;

let pool = connect_with_retries_shutdown(
Expand Down
25 changes: 25 additions & 0 deletions crates/pool/proto/op_pool/op_pool.proto
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ service OpPool {
// Removes UserOperations from the mempool
rpc RemoveOps(RemoveOpsRequest) returns (RemoveOpsResponse);

// Remove a UserOperation by its id
rpc RemoveOpById(RemoveOpByIdRequest) returns (RemoveOpByIdResponse);

// Handles a list of updates to be performed on entities
rpc UpdateEntities(UpdateEntitiesRequest) returns (UpdateEntitiesResponse);

Expand Down Expand Up @@ -281,6 +284,21 @@ message RemoveOpsResponse {
}
message RemoveOpsSuccess {}

message RemoveOpByIdRequest {
bytes entry_point = 1;
bytes sender = 2;
bytes nonce = 3;
}
message RemoveOpByIdResponse {
oneof result {
RemoveOpByIdSuccess success = 1;
MempoolError failure = 2;
}
}
message RemoveOpByIdSuccess {
bytes hash = 1;
}

message UpdateEntitiesRequest {
// The serilaized entry point address
bytes entry_point = 1;
Expand Down Expand Up @@ -428,6 +446,7 @@ message MempoolError {
SenderAddressUsedAsAlternateEntity sender_address_used_as_alternate_entity = 13;
AssociatedStorageIsAlternateSender associated_storage_is_alternate_sender = 14;
PaymasterBalanceTooLow paymaster_balance_too_low = 15;
OperationDropTooSoon operation_drop_too_soon = 16;
}
}

Expand Down Expand Up @@ -474,6 +493,12 @@ message UnsupportedAggregatorError {

message InvalidSignatureError {}

message OperationDropTooSoon {
uint64 added_at = 1;
uint64 attempted_at = 2;
uint64 must_wait = 3;
}

// PRECHECK VIOLATIONS
message PrecheckViolationError {
oneof violation {
Expand Down
3 changes: 3 additions & 0 deletions crates/pool/src/mempool/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ pub enum MempoolError {
/// An unknown entry point was specified
#[error("Unknown entry point {0}")]
UnknownEntryPoint(Address),
/// The operation drop attempt too soon after being added to the pool
#[error("Operation drop attempt too soon after being added to the pool. Added at {0}, attempted to drop at {1}, must wait {2} blocks.")]
OperationDropTooSoon(u64, u64, u64),
}

impl From<SimulationError> for MempoolError {
Expand Down
9 changes: 8 additions & 1 deletion crates/pool/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ use ethers::types::{Address, H256, U256};
#[cfg(test)]
use mockall::automock;
use rundler_sim::{EntityInfos, MempoolConfig, PrecheckSettings, SimulationSettings};
use rundler_types::{Entity, EntityType, EntityUpdate, UserOperation, ValidTimeRange};
use rundler_types::{
Entity, EntityType, EntityUpdate, UserOperation, UserOperationId, ValidTimeRange,
};
use tonic::async_trait;
pub(crate) use uo_pool::UoPool;

Expand All @@ -63,6 +65,9 @@ pub trait Mempool: Send + Sync + 'static {
/// Removes a set of operations from the pool.
fn remove_operations(&self, hashes: &[H256]);

/// Removes an operation from the pool by its ID.
fn remove_op_by_id(&self, id: &UserOperationId) -> MempoolResult<Option<H256>>;

/// Updates the reputation of an entity.
fn update_entity(&self, entity_update: EntityUpdate);

Expand Down Expand Up @@ -150,6 +155,8 @@ pub struct PoolConfig {
pub paymaster_tracking_enabled: bool,
/// Boolean field used to toggle the operation of the reputation tracker
pub reputation_tracking_enabled: bool,
/// The minimum number of blocks a user operation must be in the mempool before it can be dropped
pub drop_min_num_blocks: u64,
}

/// Stake status structure
Expand Down
34 changes: 34 additions & 0 deletions crates/pool/src/mempool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ impl PoolInner {
self.by_hash.get(&hash).map(|o| o.po.clone())
}

pub(crate) fn get_operation_by_id(&self, id: &UserOperationId) -> Option<Arc<PoolOperation>> {
self.by_id.get(id).map(|o| o.po.clone())
}

pub(crate) fn remove_operation_by_hash(&mut self, hash: H256) -> Option<Arc<PoolOperation>> {
let ret = self.remove_operation_internal(hash, None);
self.update_metrics();
Expand Down Expand Up @@ -544,6 +548,36 @@ mod tests {
check_map_entry(pool.best.iter().next(), Some(&op));
}

#[test]
fn test_get_by_hash() {
let mut pool = PoolInner::new(conf());
let op = create_op(Address::random(), 0, 1);
let hash = pool.add_operation(op.clone()).unwrap();

let get_op = pool.get_operation_by_hash(hash).unwrap();
assert_eq!(op, *get_op);

assert_eq!(pool.get_operation_by_hash(H256::random()), None);
}

#[test]
fn test_get_by_id() {
let mut pool = PoolInner::new(conf());
let op = create_op(Address::random(), 0, 1);
pool.add_operation(op.clone()).unwrap();
let id = op.uo.id();

let get_op = pool.get_operation_by_id(&id).unwrap();
assert_eq!(op, *get_op);

let bad_id = UserOperationId {
sender: Address::random(),
nonce: 0.into(),
};

assert_eq!(pool.get_operation_by_id(&bad_id), None);
}

#[test]
fn add_multiple_ops() {
let mut pool = PoolInner::new(conf());
Expand Down
100 changes: 99 additions & 1 deletion crates/pool/src/mempool/uo_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use itertools::Itertools;
use parking_lot::RwLock;
use rundler_provider::{EntryPoint, PaymasterHelper};
use rundler_sim::{Prechecker, Simulator};
use rundler_types::{Entity, EntityUpdate, EntityUpdateType, UserOperation};
use rundler_types::{Entity, EntityUpdate, EntityUpdateType, UserOperation, UserOperationId};
use rundler_utils::emit::WithEntryPoint;
use tokio::sync::broadcast;
use tonic::async_trait;
Expand Down Expand Up @@ -505,6 +505,45 @@ where
UoPoolMetrics::increment_removed_operations(count, self.config.entry_point);
}

fn remove_op_by_id(&self, id: &UserOperationId) -> MempoolResult<Option<H256>> {
// Check for the operation in the pool and its age
let po = {
let state = self.state.read();
match state.pool.get_operation_by_id(id) {
Some(po) => {
if po.sim_block_number + self.config.drop_min_num_blocks > state.block_number {
return Err(MempoolError::OperationDropTooSoon(
po.sim_block_number,
state.block_number,
self.config.drop_min_num_blocks,
));
}
po
}
None => return Ok(None),
}
};

let hash = po.uo.op_hash(self.config.entry_point, self.config.chain_id);

// This can return none if the operation was removed by another thread
if self
.state
.write()
.pool
.remove_operation_by_hash(hash)
.is_none()
{
return Ok(None);
}

self.emit(OpPoolEvent::RemovedOp {
op_hash: hash,
reason: OpRemovalReason::Requested,
});
Ok(Some(hash))
}

fn update_entity(&self, update: EntityUpdate) {
let entity = update.entity;
match update.update_type {
Expand Down Expand Up @@ -1227,6 +1266,64 @@ mod tests {
assert_eq!(pool_op.uo, op.op);
}

#[tokio::test]
async fn test_remove_by_id_too_soon() {
let op = create_op(Address::random(), 0, 0, None);
let pool = create_pool(vec![op.clone()]);

let _ = pool
.add_operation(OperationOrigin::Local, op.op.clone())
.await
.unwrap();

assert!(matches!(
pool.remove_op_by_id(&op.op.id()),
Err(MempoolError::OperationDropTooSoon(_, _, _))
));
check_ops(pool.best_operations(1, 0).unwrap(), vec![op.op]);
}

#[tokio::test]
async fn test_remove_by_id_not_found() {
let op = create_op(Address::random(), 0, 0, None);
let pool = create_pool(vec![op.clone()]);

let _ = pool
.add_operation(OperationOrigin::Local, op.op.clone())
.await
.unwrap();

assert!(matches!(
pool.remove_op_by_id(&UserOperationId {
sender: Address::random(),
nonce: 0.into()
}),
Ok(None)
));
check_ops(pool.best_operations(1, 0).unwrap(), vec![op.op]);
}

#[tokio::test]
async fn test_remove_by_id() {
let op = create_op(Address::random(), 0, 0, None);
let pool = create_pool(vec![op.clone()]);

let _ = pool
.add_operation(OperationOrigin::Local, op.op.clone())
.await
.unwrap();
let hash = op.op.op_hash(pool.config.entry_point, 1);

pool.on_chain_update(&ChainUpdate {
latest_block_number: 11,
..Default::default()
})
.await;

assert_eq!(pool.remove_op_by_id(&op.op.id()).unwrap().unwrap(), hash);
check_ops(pool.best_operations(1, 0).unwrap(), vec![]);
}

#[tokio::test]
async fn test_get_user_op_by_hash_not_found() {
let op = create_op(Address::random(), 0, 0, None);
Expand Down Expand Up @@ -1289,6 +1386,7 @@ mod tests {
throttled_entity_live_blocks: 10,
paymaster_tracking_enabled: true,
reputation_tracking_enabled: true,
drop_min_num_blocks: 10,
};

let mut simulator = MockSimulator::new();
Expand Down
Loading
Loading