Skip to content

Commit

Permalink
core/node/crypto: refetch tx nonce in case chain rpc node reports non…
Browse files Browse the repository at this point in the history
…ce issues
  • Loading branch information
bas-vk committed Sep 10, 2024
1 parent 8bbec46 commit 2c2fcb4
Showing 1 changed file with 62 additions and 19 deletions.
81 changes: 62 additions & 19 deletions core/node/crypto/chain_txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"encoding/hex"
"errors"
"fmt"
"math/big"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -79,21 +81,22 @@ type (
walletBalanceLastTimeChecked time.Time
walletBalance prometheus.Gauge

// mu guards lastPendingTx that is used to determine the tx nonce
mu sync.Mutex
lastPendingTx *txPoolPendingTransaction
// mu guards lastNonce that is used to determine the tx nonce
mu sync.Mutex
lastNonce *uint64
}

// txPoolPendingTransaction represents a transaction that is submitted to the chain but no receipt was retrieved.
txPoolPendingTransaction struct {
txHashes []common.Hash // transaction hashes, due to resubmit there can be multiple
tx *types.Transaction
txOpts *bind.TransactOpts
name string
resubmit CreateTransaction
firstSubmit time.Time
lastSubmit time.Time
tracer trace.Tracer
txHashes []common.Hash // transaction hashes, due to resubmit there can be multiple
tx *types.Transaction
txOpts *bind.TransactOpts
name string
resubmit CreateTransaction
firstSubmit time.Time
lastSubmit time.Time
tracer trace.Tracer
receiptPolls uint
// listener waits on this channel for the transaction receipt
listener chan *types.Receipt
// The hash of the transaction that was executed on the chain. This is only set on the
Expand Down Expand Up @@ -124,6 +127,7 @@ type (
transactionsReplaced *prometheus.CounterVec
transactionsPending prometheus.Gauge
transactionsProcessed *prometheus.CounterVec
transactionReceiptsMissing prometheus.Counter
transactionGasCap *prometheus.GaugeVec
transactionGasTip *prometheus.GaugeVec

Expand Down Expand Up @@ -165,6 +169,10 @@ func newPendingTransactionPool(
"How long it takes before a transaction is included in the chain since last submit",
prometheus.LinearBuckets(1.0, 2.0, 10), "chain_id", "address",
)
transactionReceiptsMissingCounter := metrics.NewCounterVecEx(
"txpool_missing_tx_receipts", "Number of receipts missing for submitted transactions",
"chain_id", "address",
)

curryLabels := prometheus.Labels{"chain_id": chainID.String(), "address": wallet.Address.String()}

Expand All @@ -188,6 +196,7 @@ func newPendingTransactionPool(
transactionsReplaced: transactionsReplacedCounter.MustCurryWith(curryLabels),
transactionsPending: transactionsPendingCounter.With(curryLabels),
transactionsProcessed: transactionsProcessedCounter.MustCurryWith(curryLabels),
transactionReceiptsMissing: transactionReceiptsMissingCounter.With(curryLabels),
transactionTotalInclusionDuration: transactionTotalInclusionDuration.With(curryLabels),
transactionInclusionDuration: transactionInclusionDuration.With(curryLabels),
transactionGasCap: transactionGasCap.MustCurryWith(curryLabels),
Expand Down Expand Up @@ -246,11 +255,13 @@ func (pool *pendingTransactionPool) checkPendingTransactions(ctx context.Context
)

if ptxConfirmed {
ptx.receiptPolls++
// there can be multiple transactions (original + replacements), start from latest submitted
for i := len(ptx.txHashes) - 1; i >= 0; i-- {
txHash := ptx.txHashes[i]
receipt, err := pool.client.TransactionReceipt(ctx, txHash)
if receipt != nil {
fmt.Printf("BVK DBG got receipt for tx nonce: %d / %s\n", ptxNonce, txHash)
pool.pendingTxs.Delete(ptx.tx.Nonce())
ptx.executedHash.Store(&txHash)
ptx.listener <- receipt
Expand Down Expand Up @@ -294,6 +305,20 @@ func (pool *pendingTransactionPool) checkPendingTransactions(ctx context.Context
return true
}
}

// it is possible that the nonce already progressed as an indication the tx was included in the chain
// but the rpc node doesn't yet have the receipt available. Allow several retries before giving up waiting
// for the receipt.
if ptx.receiptPolls > 15 {
// Receipt not available can be caused by the chain rpc node lagging behind the canonical chain at
// the time the transactions were created and an outdated nonce was retrieved from the rpc node. A tx
// with hat same nonce was already included in the canonical chain. When the rpc node caught up the tx
// was dropped from the rpc node tx pool and therefor we never get a receipt for it. Closing
// ptx.listener will yield an error to the client waiting for the receipt that it is not available.
pool.transactionReceiptsMissing.Add(1)
pool.pendingTxs.Delete(nonce)
close(ptx.listener) // this will return an error that the receipt wasn't available when waiting for it
}
} else { // determine if tx is eligible for resubmit
if pool.replacePolicy.Eligible(head, ptx.lastSubmit, ptx.tx) {
ptx.txOpts.GasPrice, ptx.txOpts.GasFeeCap, ptx.txOpts.GasTipCap =
Expand Down Expand Up @@ -478,8 +503,8 @@ func (tx *txPoolPendingTransaction) TransactionHash() common.Hash {

// caller is expected to hold a lock on r.mu
func (r *transactionPool) nextNonce(ctx context.Context) (uint64, error) {
if r.lastPendingTx != nil {
return r.lastPendingTx.tx.Nonce() + 1, nil
if r.lastNonce != nil {
return *r.lastNonce + 1, nil
}
return r.client.PendingNonceAt(ctx, r.wallet.Address)
}
Expand Down Expand Up @@ -507,17 +532,24 @@ func (r *transactionPool) Submit(
name string,
createTx CreateTransaction,
) (TransactionPoolPendingTransaction, error) {
var span trace.Span
// lock to prevent tx.Nonce collisions
r.mu.Lock()
defer r.mu.Unlock()

return r.submitNoLock(ctx, name, createTx, true)
}
func (r *transactionPool) submitNoLock(
ctx context.Context,
name string,
createTx CreateTransaction,
canRetry bool,
) (TransactionPoolPendingTransaction, error) {
var span trace.Span
if r.tracer != nil {
ctx, span = r.tracer.Start(ctx, "txpool_submit")
defer span.End()
}

// lock to prevent tx.Nonce collisions
r.mu.Lock()
defer r.mu.Unlock()

nonce, err := r.nextNonce(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -548,6 +580,13 @@ func (r *transactionPool) Submit(
}

if err := r.client.SendTransaction(ctx, tx); err != nil {
// force fetching the latest nonce from the rpc node again when it was reported to be too low. This can be
// caused by the chain rpc node lagging behind when the tx pool fetched the nonce. When the chain rpc node
// caught up the fetched nonce can be too low. Fetch the nonce again recovers from this scenario.
if canRetry && strings.Contains(strings.ToLower(err.Error()), "nonce too low") {
r.lastNonce = nil
return r.submitNoLock(ctx, name, createTx, false)
}
return nil, err
}

Expand All @@ -564,7 +603,11 @@ func (r *transactionPool) Submit(
listener: make(chan *types.Receipt, 1),
}

r.lastPendingTx = pendingTx
if r.lastNonce == nil {
r.lastNonce = new(uint64)
}
*r.lastNonce = pendingTx.tx.Nonce()

r.pendingTransactionPool.addPendingTx <- pendingTx

// metrics
Expand Down

0 comments on commit 2c2fcb4

Please sign in to comment.