Skip to content

Commit

Permalink
feat: add debug rpc method to dump paymaster balances from pool
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Feb 21, 2024
1 parent 06b76f2 commit c635e0a
Show file tree
Hide file tree
Showing 14 changed files with 465 additions and 104 deletions.
21 changes: 21 additions & 0 deletions crates/pool/proto/op_pool/op_pool.proto
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ service OpPool {
// debug_bundler_setReputation
rpc DebugDumpReputation(DebugDumpReputationRequest) returns (DebugDumpReputationResponse);

// Dumps the paymaster balances
rpc DebugDumpPaymasterBalances(DebugDumpPaymasterBalancesRequest) returns (DebugDumpPaymasterBalancesResponse);

// Get reputation status of address
rpc GetReputationStatus(GetReputationStatusRequest) returns (GetReputationStatusResponse);

Expand Down Expand Up @@ -347,6 +350,24 @@ message DebugDumpReputationSuccess {
repeated Reputation reputations = 1;
}

message DebugDumpPaymasterBalancesRequest {
bytes entry_point = 1;
}
message DebugDumpPaymasterBalancesResponse {
oneof result {
DebugDumpPaymasterBalancesSuccess success = 1;
MempoolError failure = 2;
}
}
message DebugDumpPaymasterBalancesSuccess {
repeated PaymasterBalance balances = 1;
}
message PaymasterBalance {
bytes address = 1;
bytes pending_balance = 2;
bytes confirmed_balance = 3;
}

message SubscribeNewHeadsRequest {}
message SubscribeNewHeadsResponse {
// The new chain head
Expand Down
7 changes: 5 additions & 2 deletions crates/pool/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ pub trait Mempool: Send + Sync + 'static {
/// Dumps the mempool's reputation tracking
fn dump_reputation(&self) -> Vec<Reputation>;

/// Dumps the mempool's paymaster balance cache
fn dump_paymaster_balances(&self) -> Vec<PaymasterMetadata>;

/// Dumps the mempool's reputation tracking
fn get_reputation_status(&self, address: Address) -> ReputationStatus;

Expand All @@ -106,7 +109,7 @@ pub trait Mempool: Send + Sync + 'static {
/// Get stake status for address
async fn get_stake_status(&self, address: Address) -> MempoolResult<StakeStatus>;

/// Reset paymater state
/// Reset paymaster state
async fn reset_confirmed_paymaster_balances(&self) -> MempoolResult<()>;

/// Turns on and off tracking errors
Expand Down Expand Up @@ -163,7 +166,7 @@ pub struct StakeStatus {

#[derive(Debug, Clone, Copy)]
pub struct StakeInfo {
/// Stake ammount
/// Stake amount
pub stake: u128,
/// Unstake delay in seconds
pub unstake_delay_sec: u32,
Expand Down
11 changes: 11 additions & 0 deletions crates/pool/src/mempool/paymaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,17 @@ impl PaymasterTracker {
None
}

pub(crate) fn dump_paymaster_metadata(&self) -> Vec<PaymasterMetadata> {
self.paymaster_balances
.iter()
.map(|(address, balance)| PaymasterMetadata {
pending_balance: balance.pending_balance(),
confirmed_balance: balance.confirmed,
address: *address,
})
.collect()
}

pub(crate) fn unmine_actual_cost(&mut self, paymaster: &Address, actual_cost: U256) {
if let Some(paymaster_balance) = self.paymaster_balances.get_mut(paymaster) {
paymaster_balance.confirmed = paymaster_balance.confirmed.saturating_add(actual_cost);
Expand Down
4 changes: 4 additions & 0 deletions crates/pool/src/mempool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,10 @@ impl PoolInner {
self.paymaster_balances.paymaster_metadata(paymaster)
}

pub(crate) fn dump_paymaster_metadata(&self) -> Vec<PaymasterMetadata> {
self.paymaster_balances.dump_paymaster_metadata()
}

pub(crate) fn paymaster_exists(&self, paymaster: Address) -> bool {
self.paymaster_balances.paymaster_exists(paymaster)
}
Expand Down
4 changes: 4 additions & 0 deletions crates/pool/src/mempool/uo_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,10 @@ where
self.reputation.dump_reputation()
}

fn dump_paymaster_balances(&self) -> Vec<PaymasterMetadata> {
self.state.read().pool.dump_paymaster_metadata()
}

fn get_reputation_status(&self, address: Address) -> ReputationStatus {
self.reputation.status(address)
}
Expand Down
128 changes: 90 additions & 38 deletions crates/pool/src/server/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use std::{collections::HashMap, pin::Pin, sync::Arc};
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};

use async_stream::stream;
use async_trait::async_trait;
Expand All @@ -29,7 +29,9 @@ use tracing::error;
use super::{PoolResult, PoolServerError};
use crate::{
chain::ChainUpdate,
mempool::{Mempool, MempoolError, OperationOrigin, PoolOperation, StakeStatus},
mempool::{
Mempool, MempoolError, OperationOrigin, PaymasterMetadata, PoolOperation, StakeStatus,
},
server::{NewHead, PoolServer, Reputation},
ReputationStatus,
};
Expand Down Expand Up @@ -254,6 +256,18 @@ impl PoolServer for LocalPoolHandle {
}
}

async fn debug_dump_paymaster_balances(
&self,
entry_point: Address,
) -> PoolResult<Vec<PaymasterMetadata>> {
let req = ServerRequestKind::DebugDumpPaymasterBalances { entry_point };
let resp = self.send(req).await?;
match resp {
ServerResponse::DebugDumpPaymasterBalances { balances } => Ok(balances),
_ => Err(PoolServerError::UnexpectedResponse),
}
}

async fn get_stake_status(
&self,
entry_point: Address,
Expand Down Expand Up @@ -438,6 +452,14 @@ where
Ok(mempool.dump_reputation())
}

fn debug_dump_paymaster_balances(
&self,
entry_point: Address,
) -> PoolResult<Vec<PaymasterMetadata>> {
let mempool = self.get_pool(entry_point)?;
Ok(mempool.dump_paymaster_balances())
}

fn get_reputation_status(
&self,
entry_point: Address,
Expand All @@ -447,6 +469,28 @@ where
Ok(mempool.get_reputation_status(address))
}

fn get_pool_and_spawn<F, Fut>(
&self,
entry_point: Address,
response: oneshot::Sender<Result<ServerResponse, PoolServerError>>,
f: F,
) where
F: FnOnce(Arc<M>, oneshot::Sender<Result<ServerResponse, PoolServerError>>) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
match self.get_pool(entry_point) {
Ok(mempool) => {
let mempool = Arc::clone(mempool);
tokio::spawn(f(mempool, response));
}
Err(e) => {
if let Err(e) = response.send(Err(e)) {
tracing::error!("Failed to send response: {:?}", e);
}
}
}
}

async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> {
loop {
tokio::select! {
Expand All @@ -473,29 +517,43 @@ where
}
Some(req) = self.req_receiver.recv() => {
let resp = match req.request {
// Async methods
// Responses are sent in the spawned task
ServerRequestKind::AddOp { entry_point, op, origin } => {
let fut = |mempool: Arc<M>, response: oneshot::Sender<Result<ServerResponse, PoolServerError>>| async move {
let resp = match mempool.add_operation(origin, op).await {
Ok(hash) => Ok(ServerResponse::AddOp { hash }),
Err(e) => Err(e.into()),
};
if let Err(e) = response.send(resp) {
tracing::error!("Failed to send response: {:?}", e);
}
};

self.get_pool_and_spawn(entry_point, req.response, fut);
continue;
},
ServerRequestKind::GetStakeStatus { entry_point, address }=> {
let fut = |mempool: Arc<M>, response: oneshot::Sender<Result<ServerResponse, PoolServerError>>| async move {
let resp = match mempool.get_stake_status(address).await {
Ok(status) => Ok(ServerResponse::GetStakeStatus { status }),
Err(e) => Err(e.into()),
};
if let Err(e) = response.send(resp) {
tracing::error!("Failed to send response: {:?}", e);
}
};
self.get_pool_and_spawn(entry_point, req.response, fut);
continue;
},

// Sync methods
// Responses are sent in the main loop below
ServerRequestKind::GetSupportedEntryPoints => {
Ok(ServerResponse::GetSupportedEntryPoints {
entry_points: self.mempools.keys().copied().collect()
})
},
ServerRequestKind::AddOp { entry_point, op, origin } => {
match self.get_pool(entry_point) {
Ok(mempool) => {
let mempool = Arc::clone(mempool);
tokio::spawn(async move {
let resp = match mempool.add_operation(origin, op).await {
Ok(hash) => Ok(ServerResponse::AddOp { hash }),
Err(e) => Err(e.into()),
};
if let Err(e) = req.response.send(resp) {
tracing::error!("Failed to send response: {:?}", e);
}
});
continue;
},
Err(e) => Err(e),
}
},
ServerRequestKind::GetOps { entry_point, max_ops, shard_index } => {
match self.get_ops(entry_point, max_ops, shard_index) {
Ok(ops) => Ok(ServerResponse::GetOps { ops }),
Expand Down Expand Up @@ -550,27 +608,15 @@ where
Err(e) => Err(e),
}
},
ServerRequestKind::GetReputationStatus{ entry_point, address } => {
match self.get_reputation_status(entry_point, address) {
Ok(status) => Ok(ServerResponse::GetReputationStatus { status }),
ServerRequestKind::DebugDumpPaymasterBalances { entry_point } => {
match self.debug_dump_paymaster_balances(entry_point) {
Ok(balances) => Ok(ServerResponse::DebugDumpPaymasterBalances { balances }),
Err(e) => Err(e),
}
},
ServerRequestKind::GetStakeStatus { entry_point, address }=> {
match self.get_pool(entry_point) {
Ok(mempool) => {
let mempool = Arc::clone(mempool);
tokio::spawn(async move {
let resp = match mempool.get_stake_status(address).await {
Ok(status) => Ok(ServerResponse::GetStakeStatus { status }),
Err(e) => Err(e.into()),
};
if let Err(e) = req.response.send(resp) {
tracing::error!("Failed to send response: {:?}", e);
}
});
continue;
},
ServerRequestKind::GetReputationStatus{ entry_point, address } => {
match self.get_reputation_status(entry_point, address) {
Ok(status) => Ok(ServerResponse::GetReputationStatus { status }),
Err(e) => Err(e),
}
},
Expand Down Expand Up @@ -639,6 +685,9 @@ enum ServerRequestKind {
DebugDumpReputation {
entry_point: Address,
},
DebugDumpPaymasterBalances {
entry_point: Address,
},
GetReputationStatus {
entry_point: Address,
address: Address,
Expand Down Expand Up @@ -675,6 +724,9 @@ enum ServerResponse {
DebugDumpReputation {
reputations: Vec<Reputation>,
},
DebugDumpPaymasterBalances {
balances: Vec<PaymasterMetadata>,
},
GetReputationStatus {
status: ReputationStatus,
},
Expand Down
34 changes: 20 additions & 14 deletions crates/pool/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub use remote::RemotePoolClient;
use rundler_types::{EntityUpdate, UserOperation};

use crate::{
mempool::{PoolOperation, Reputation, StakeStatus},
mempool::{PaymasterMetadata, PoolOperation, Reputation, StakeStatus},
ReputationStatus,
};

Expand Down Expand Up @@ -90,6 +90,20 @@ pub trait PoolServer: Send + Sync + 'static {
/// has processed all operations up to that head.
async fn subscribe_new_heads(&self) -> PoolResult<Pin<Box<dyn Stream<Item = NewHead> + Send>>>;

/// Get reputation status given entrypoint and address
async fn get_reputation_status(
&self,
entry_point: Address,
address: Address,
) -> PoolResult<ReputationStatus>;

/// Get stake status given entrypoint and address
async fn get_stake_status(
&self,
entry_point: Address,
address: Address,
) -> PoolResult<StakeStatus>;

/// Clear the pool state, used for debug methods
async fn debug_clear_state(
&self,
Expand All @@ -108,22 +122,14 @@ pub trait PoolServer: Send + Sync + 'static {
reputations: Vec<Reputation>,
) -> PoolResult<()>;

/// Get reputation status given entrypoint and address
async fn get_reputation_status(
&self,
entry_point: Address,
address: Address,
) -> PoolResult<ReputationStatus>;
/// Dump reputations for entities, used for debug methods
async fn debug_dump_reputation(&self, entry_point: Address) -> PoolResult<Vec<Reputation>>;

/// Get stake status given entrypoint and address
async fn get_stake_status(
/// Dump paymaster balances, used for debug methods
async fn debug_dump_paymaster_balances(
&self,
entry_point: Address,
address: Address,
) -> PoolResult<StakeStatus>;

/// Dump reputations for entities, used for debug methods
async fn debug_dump_reputation(&self, entry_point: Address) -> PoolResult<Vec<Reputation>>;
) -> PoolResult<Vec<PaymasterMetadata>>;

/// Controls whether or not the certain tracking data structures are used to block user operations
async fn admin_set_tracking(
Expand Down
Loading

0 comments on commit c635e0a

Please sign in to comment.