Skip to content

Commit

Permalink
handle confirmed txn that dont have required num of confirmations
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Jan 4, 2024
1 parent 0fc0b79 commit 0b469f4
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 43 deletions.
16 changes: 10 additions & 6 deletions common/geth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,12 @@ func (c *EthClient) EstimateGasPriceAndLimitAndSendTx(
return receipt, err
}

// EnsureTransactionEvaled waits for tx to be mined on the blockchain and returns the receipt.
// If the context times out but the receipt is available, it returns both receipt and error, noting that the transaction is confirmed but has not accumulated the required number of confirmations.
func (c *EthClient) EnsureTransactionEvaled(ctx context.Context, tx *types.Transaction, tag string) (*types.Receipt, error) {
receipt, err := c.waitMined(ctx, tx)
if err != nil {
return nil, fmt.Errorf("EnsureTransactionEvaled: failed to wait for transaction (%s) to mine: %w", tag, err)
return receipt, fmt.Errorf("EnsureTransactionEvaled: failed to wait for transaction (%s) to mine: %w", tag, err)
}
if receipt.Status != 1 {
c.Logger.Error("Transaction Failed", "tag", tag, "txHash", tx.Hash().Hex(), "status", receipt.Status, "GasUsed", receipt.GasUsed)
Expand All @@ -212,15 +214,17 @@ func (c *EthClient) EnsureTransactionEvaled(ctx context.Context, tx *types.Trans
return receipt, nil
}

// waitMined waits for tx to be mined on the blockchain.
// waitMined waits for tx to be mined on the blockchain and returns the receipt.
// If the context times out but the receipt is available, it returns both receipt and error, noting that the transaction is confirmed but has not accumulated the required number of confirmations.
// Taken from https://github.com/ethereum/go-ethereum/blob/master/accounts/abi/bind/util.go#L32,
// but added a check for number of confirmations.
func (c *EthClient) waitMined(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
queryTicker := time.NewTicker(3 * time.Second)
defer queryTicker.Stop()

var receipt *types.Receipt
var err error
for {
receipt, err := c.TransactionReceipt(ctx, tx.Hash())
receipt, err = c.TransactionReceipt(ctx, tx.Hash())
if err == nil {
chainTip, err := c.BlockNumber(ctx)
if err == nil {
Expand All @@ -236,14 +240,14 @@ func (c *EthClient) waitMined(ctx context.Context, tx *types.Transaction) (*type

if errors.Is(err, ethereum.NotFound) {
c.Logger.Trace("Transaction not yet mined")
} else {
} else if err != nil {
c.Logger.Trace("Receipt retrieval failed", "err", err)
}

// Wait for the next round.
select {
case <-ctx.Done():
return nil, ctx.Err()
return receipt, ctx.Err()
case <-queryTicker.C:
}
}
Expand Down
57 changes: 42 additions & 15 deletions disperser/batcher/txn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,22 @@ import (

// percentage multiplier for gas price. It needs to be >= 10 to properly replace existing transaction
// e.g. 10 means 10% increase
var gasPricePercentageMultiplier = big.NewInt(10)
var (
gasPricePercentageMultiplier = big.NewInt(10)
hundred = big.NewInt(100)
)

type TxnRequest struct {
Tx *types.Transaction
Tag string
Value *big.Int
HandleSuccess func(ctx context.Context, receipt *types.Receipt)
HandleFailure func(ctx context.Context, err error)
Tx *types.Transaction
Tag string
Value *big.Int
}

// ReceiptOrErr is a wrapper for a transaction receipt or an error.
// Receipt should be nil if there is an error, and non-nil if there is no error.
type ReceiptOrErr struct {
Receipt *types.Receipt
Err error
}

// TxnManager receives transactions from the caller, sends them to the chain, and monitors their status.
Expand All @@ -31,6 +39,8 @@ type TxnRequest struct {
type TxnManager struct {
mu sync.Mutex

ReceiptChan chan *ReceiptOrErr

ethClient common.EthClient
requestChan chan *TxnRequest
logger common.Logger
Expand All @@ -39,8 +49,9 @@ type TxnManager struct {
txnRefreshInterval time.Duration
}

func NewTxnManager(ethClient common.EthClient, queueSize int, txnRefreshInterval time.Duration, logger common.Logger) *TxnManager {
func NewTxnManager(receiptChan chan *ReceiptOrErr, ethClient common.EthClient, queueSize int, txnRefreshInterval time.Duration, logger common.Logger) *TxnManager {
return &TxnManager{
ReceiptChan: receiptChan,
ethClient: ethClient,
requestChan: make(chan *TxnRequest, queueSize),
logger: logger,
Expand All @@ -58,9 +69,15 @@ func (t *TxnManager) Start(ctx context.Context) {
case req := <-t.requestChan:
receipt, err := t.monitorTransaction(ctx, req)
if err != nil {
req.HandleFailure(ctx, err)
t.ReceiptChan <- &ReceiptOrErr{
Receipt: nil,
Err: err,
}
} else {
req.HandleSuccess(ctx, receipt)
t.ReceiptChan <- &ReceiptOrErr{
Receipt: receipt,
Err: nil,
}
}
}
}
Expand All @@ -74,6 +91,7 @@ func (t *TxnManager) Start(ctx context.Context) {
func (t *TxnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) error {
t.mu.Lock()
defer t.mu.Unlock()
t.logger.Debug("[ProcessTransaction] new transaction", "tag", req.Tag, "nonce", req.Tx.Nonce(), "gasFeeCap", req.Tx.GasFeeCap(), "gasTipCap", req.Tx.GasTipCap())
gasTipCap, gasFeeCap, err := t.ethClient.GetLatestGasCaps(ctx)
if err != nil {
return fmt.Errorf("failed to get latest gas caps: %w", err)
Expand Down Expand Up @@ -109,16 +127,20 @@ func (t *TxnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*
}

if errors.Is(err, context.DeadlineExceeded) {
t.logger.Warn("transaction not mined within timeout, resending with higher gas price", "tag", req.Tag, "txHash", req.Tx.Hash().Hex())
txn, err := t.speedUpTxn(ctxWithTimeout, req.Tx, req.Tag)
if receipt != nil {
t.logger.Warn("transaction has been mined, but hasn't accumulated the required number of confirmations", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
continue
}
t.logger.Warn("transaction not mined within timeout, resending with higher gas price", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
req.Tx, err = t.speedUpTxn(ctx, req.Tx, req.Tag)
if err != nil {
t.logger.Error("failed to speed up transaction", "err", err)
return nil, err
continue
}
err = t.ethClient.SendTransaction(ctx, txn)
err = t.ethClient.SendTransaction(ctx, req.Tx)
if err != nil {
t.logger.Error("failed to send txn", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "err", err)
return nil, err
continue
}
} else {
t.logger.Error("transaction failed", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err)
Expand Down Expand Up @@ -152,9 +174,14 @@ func (t *TxnManager) speedUpTxn(ctx context.Context, tx *types.Transaction, tag
newGasFeeCap = increasedGasFeeCap
}

t.logger.Debug("[speedUpTxn] increasing gas price", "tag", tag, "txHash", tx.Hash().Hex(), "nonce", tx.Nonce(), "prevGasTipCap", prevGasTipCap, "prevGasFeeCap", prevGasFeeCap, "newGasTipCap", newGasTipCap, "newGasFeeCap", newGasFeeCap)
return t.ethClient.UpdateGas(ctx, tx, tx.Value(), newGasTipCap, newGasFeeCap)
}

// increaseGasPrice increases the gas price by specified percentage.
// i.e. gasPrice + (gasPrice * gasPricePercentageMultiplier / 100)
func increaseGasPrice(gasPrice *big.Int) *big.Int {
return new(big.Int).Add(gasPrice, new(big.Int).Div(gasPrice, gasPricePercentageMultiplier))
bump := new(big.Int).Mul(gasPrice, gasPricePercentageMultiplier)
bump.Div(bump, hundred)
return new(big.Int).Add(gasPrice, bump)
}
50 changes: 28 additions & 22 deletions disperser/batcher/txn_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ func TestProcessTransaction(t *testing.T) {
ethClient := &mock.MockEthClient{}
logger, err := logging.GetLogger(logging.DefaultCLIConfig())
assert.NoError(t, err)
txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger)
receiptChan := make(chan *batcher.ReceiptOrErr, 1)
txnManager := batcher.NewTxnManager(receiptChan, ethClient, 5, 48*time.Second, logger)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
txnManager.Start(ctx)
Expand All @@ -29,34 +30,47 @@ func TestProcessTransaction(t *testing.T) {
ethClient.On("SendTransaction").Return(nil)
ethClient.On("EnsureTransactionEvaled").Return(&types.Receipt{
BlockNumber: new(big.Int).SetUint64(1),
}, nil)
}, nil).Once()

successful := false
err = txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{
Tx: txn,
Tag: "test transaction",
Value: nil,
HandleSuccess: func(ctx context.Context, receipt *types.Receipt) {
fmt.Println("success function called", successful)
successful = true
},
HandleFailure: func(ctx context.Context, err error) {
assert.Fail(t, "should not be called")
},
})
<-ctx.Done()
assert.NoError(t, err)
assert.True(t, successful)
receiptOrErr := <-receiptChan
assert.NoError(t, receiptOrErr.Err)
assert.Equal(t, uint64(1), receiptOrErr.Receipt.BlockNumber.Uint64())
ethClient.AssertNumberOfCalls(t, "GetLatestGasCaps", 1)
ethClient.AssertNumberOfCalls(t, "UpdateGas", 1)
ethClient.AssertNumberOfCalls(t, "SendTransaction", 1)
ethClient.AssertNumberOfCalls(t, "EnsureTransactionEvaled", 1)

// now test the case where the transaction fails
randomErr := fmt.Errorf("random error")
ethClient.On("EnsureTransactionEvaled").Return(nil, randomErr)
err = txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{
Tx: txn,
Tag: "test transaction",
Value: nil,
})
<-ctx.Done()
assert.NoError(t, err)
receiptOrErr = <-receiptChan
assert.Error(t, receiptOrErr.Err, randomErr)
assert.Nil(t, receiptOrErr.Receipt)
ethClient.AssertNumberOfCalls(t, "GetLatestGasCaps", 2)
ethClient.AssertNumberOfCalls(t, "UpdateGas", 2)
ethClient.AssertNumberOfCalls(t, "SendTransaction", 2)
ethClient.AssertNumberOfCalls(t, "EnsureTransactionEvaled", 2)
}

func TestReplaceGasFee(t *testing.T) {
ethClient := &mock.MockEthClient{}
logger, err := logging.GetLogger(logging.DefaultCLIConfig())
assert.NoError(t, err)
txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger)
receiptChan := make(chan *batcher.ReceiptOrErr, 1)
txnManager := batcher.NewTxnManager(receiptChan, ethClient, 5, 48*time.Second, logger)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
txnManager.Start(ctx)
Expand All @@ -70,23 +84,15 @@ func TestReplaceGasFee(t *testing.T) {
BlockNumber: new(big.Int).SetUint64(1),
}, nil)

successful := false
err = txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{
Tx: txn,
Tag: "test transaction",
Value: nil,
HandleSuccess: func(ctx context.Context, receipt *types.Receipt) {
fmt.Println("success function called", successful)
successful = true
},
HandleFailure: func(ctx context.Context, err error) {
assert.Fail(t, "should not be called")
},
})
<-ctx.Done()
assert.NoError(t, err)
assert.True(t, successful)
ethClient.AssertNumberOfCalls(t, "GetLatestGasCaps", 2)
ethClient.AssertNumberOfCalls(t, "UpdateGas", 2)
ethClient.AssertNumberOfCalls(t, "SendTransaction", 2)
ethClient.AssertNumberOfCalls(t, "EnsureTransactionEvaled", 2)
}

0 comments on commit 0b469f4

Please sign in to comment.