Skip to content

Commit

Permalink
fix: fallback for unknown node client (#1017)
Browse files Browse the repository at this point in the history
  • Loading branch information
k1rill-fedoseev authored Aug 16, 2024
1 parent 5faf103 commit afa1688
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
indexer::{
common_transport::CommonTransport,
rpc_utils::{CallTracer, TraceType},
rpc_utils::{to_string, CallTracer, TraceType},
settings::IndexerSettings,
},
repository,
Expand All @@ -12,7 +12,7 @@ use ethers::prelude::{
abi::{AbiEncode, Error},
parse_log,
types::{Address, Bytes, Filter, Log, TransactionReceipt},
EthEvent, Middleware, Provider, ProviderError, H256,
EthEvent, Middleware, NodeClient, Provider, ProviderError, H256,
};
use futures::{
stream,
Expand Down Expand Up @@ -122,6 +122,10 @@ impl Indexer {

#[instrument(name = "indexer", skip_all, level = "info", fields(version = L::version()))]
pub async fn start<L: IndexerLogic>(&self) -> anyhow::Result<()> {
tracing::debug!("fetching node client");
let variant = self.client.node_client().await.unwrap_or(NodeClient::Geth);
tracing::info!(variant = to_string(variant), "fetched node client");

let mut stream_jobs = stream::SelectAll::<BoxStream<Job>>::new();

if self.settings.realtime.enabled {
Expand Down Expand Up @@ -209,7 +213,7 @@ impl Indexer {
stream_txs
.for_each_concurrent(Some(self.settings.concurrency as usize), |tx| async move {
let mut backoff = vec![5, 20, 120].into_iter().map(Duration::from_secs);
while let Err(err) = &self.handle_tx::<L>(tx).await {
while let Err(err) = &self.handle_tx::<L>(tx, variant).await {
match backoff.next() {
None => {
tracing::error!(error = ?err, tx_hash = ?tx, "tx handler failed, skipping");
Expand Down Expand Up @@ -276,8 +280,12 @@ impl Indexer {
.flat_map(stream::iter)
}

#[instrument(name = "indexer::handle_tx", skip(self), level = "info")]
async fn handle_tx<L: IndexerLogic>(&self, tx_hash: H256) -> anyhow::Result<()> {
#[instrument(name = "indexer::handle_tx", skip(self, variant), level = "info")]
async fn handle_tx<L: IndexerLogic>(
&self,
tx_hash: H256,
variant: NodeClient,
) -> anyhow::Result<()> {
let tx = self
.client
.get_transaction(tx_hash)
Expand Down Expand Up @@ -311,7 +319,7 @@ impl Indexer {
"tx contains more than one bundle or was sent indirectly, fetching tx trace"
);
self.client
.common_trace_transaction(tx_hash)
.common_trace_transaction(tx_hash, variant)
.await?
.into_iter()
.filter_map(|t| {
Expand Down Expand Up @@ -397,7 +405,10 @@ mod tests {
db.clone(),
Default::default(),
);
indexer.handle_tx::<v06::IndexerV06>(tx_hash).await.unwrap();
indexer
.handle_tx::<v06::IndexerV06>(tx_hash, NodeClient::Geth)
.await
.unwrap();

let op_hash =
H256::from_str("0x2d5f7a884e9a99cfe2445db2af140a8851fbd860852b668f2f199190f68adf87")
Expand Down Expand Up @@ -465,7 +476,10 @@ mod tests {
db.clone(),
Default::default(),
);
indexer.handle_tx::<v07::IndexerV07>(tx_hash).await.unwrap();
indexer
.handle_tx::<v07::IndexerV07>(tx_hash, NodeClient::Geth)
.await
.unwrap();

let op_hash =
H256::from_str("0x02bfece5db8c1bd400049c14e20ee988e62c057d296e9aefa34bd9b7f146033e")
Expand Down
17 changes: 14 additions & 3 deletions user-ops-indexer/user-ops-indexer-logic/src/indexer/rpc_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub trait CallTracer {
async fn common_trace_transaction(
&self,
tx_hash: TxHash,
variant: NodeClient,
) -> Result<Vec<CommonCallTrace>, ProviderError>;
}

Expand All @@ -35,10 +36,9 @@ impl<T: JsonRpcClient> CallTracer for Provider<T> {
async fn common_trace_transaction(
&self,
tx_hash: TxHash,
variant: NodeClient,
) -> Result<Vec<CommonCallTrace>, ProviderError> {
let client = self.node_client().await?;

match client {
match variant {
NodeClient::Geth => {
let geth_trace = self
.debug_trace_transaction(
Expand Down Expand Up @@ -132,6 +132,17 @@ fn flatten_geth_trace(root: CallFrame) -> Vec<CommonCallTrace> {
res
}

pub fn to_string(node_client: NodeClient) -> String {
match node_client {
NodeClient::Geth => "geth",
NodeClient::Erigon => "erigon",
NodeClient::OpenEthereum => "openethereum",
NodeClient::Nethermind => "nethermind",
NodeClient::Besu => "besu",
}
.to_string()
}

#[cfg(test)]
mod tests {
use crate::indexer::rpc_utils::{flatten_geth_trace, TraceType};
Expand Down

0 comments on commit afa1688

Please sign in to comment.