From 129c7349a6c1376152f566d657d3f8928a0c7d4d Mon Sep 17 00:00:00 2001 From: Dimitris Date: Thu, 5 Dec 2024 14:00:33 +0200 Subject: [PATCH] Fix CI --- core/chains/evm/txm/txm.go | 39 ++++++++++++++----- core/chains/evm/txm/txm_test.go | 23 +++++++---- core/chains/evm/txm/types/transaction.go | 2 +- .../config-multi-chain-effective.toml | 12 +++--- 4 files changed, 53 insertions(+), 23 deletions(-) diff --git a/core/chains/evm/txm/txm.go b/core/chains/evm/txm/txm.go index 340bd222d58..c37099d3783 100644 --- a/core/chains/evm/txm/txm.go +++ b/core/chains/evm/txm/txm.go @@ -20,10 +20,12 @@ import ( ) const ( - broadcastInterval time.Duration = 30 * time.Second - maxInFlightTransactions int = 16 - maxInFlightSubset int = 3 - maxAllowedAttempts uint16 = 10 + broadcastInterval time.Duration = 30 * time.Second + maxInFlightTransactions int = 16 + maxInFlightSubset int = 3 + maxAllowedAttempts uint16 = 10 + pendingNonceDefaultTimeout time.Duration = 30 * time.Second + pendingNonceRecheckInterval time.Duration = 1 * time.Second ) type Client interface { @@ -72,7 +74,7 @@ var ( }, []string{"chainID"}) promNumConfirmedTxs = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "txm_num_confirmed_transactions", - Help: "Total number of confirmed transactions. Note that this can happen multiple times per transaction in the case of re-orgs.", + Help: "Total number of confirmed transactions. Note that this can happen multiple times per transaction in the case of re-orgs or when filling the nonce for untracked transactions.", }, []string{"chainID"}) promNumNonceGaps = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "txm_num_nonce_gaps", @@ -135,7 +137,7 @@ func (t *Txm) Start(ctx context.Context) error { return err } for _, address := range addresses { - err := t.startAddress(address) + err := t.startAddress(ctx, address) if err != nil { return err } @@ -144,10 +146,10 @@ func (t *Txm) Start(ctx context.Context) error { }) } -func (t *Txm) startAddress(address common.Address) error { +func (t *Txm) startAddress(ctx context.Context, address common.Address) error { triggerCh := make(chan struct{}, 1) t.triggerCh[address] = triggerCh - pendingNonce, err := t.client.PendingNonceAt(context.TODO(), address) + pendingNonce, err := t.pollForPendingNonce(ctx, address) if err != nil { return err } @@ -159,6 +161,24 @@ func (t *Txm) startAddress(address common.Address) error { return nil } +func (t *Txm) pollForPendingNonce(ctx context.Context, address common.Address) (pendingNonce uint64, err error) { + ctxWithTimeout, cancel := context.WithTimeout(ctx, pendingNonceDefaultTimeout) + defer cancel() + for { + pendingNonce, err = t.client.PendingNonceAt(ctxWithTimeout, address) + if err != nil { + t.lggr.Errorw("Error when fetching initial pending nonce", "address", address, "err", err) + select { + case <-time.After(pendingNonceRecheckInterval): + case <-ctx.Done(): + return 0, context.Cause(ctx) + } + continue + } + return pendingNonce, nil + } +} + func (t *Txm) Close() error { return t.StopOnce("Txm", func() error { close(t.stopCh) @@ -188,6 +208,7 @@ func (t *Txm) Trigger(address common.Address) { } func (t *Txm) Abandon(address common.Address) error { + t.lggr.Infof("Dropping unstarted and unconfirmed transactions for address: %v", address) return t.txStore.AbandonPendingTransactions(context.TODO(), address) } @@ -350,7 +371,7 @@ func (t *Txm) sendTransactionWithError(ctx context.Context, tx *types.Transactio return err } if pendingNonce <= *tx.Nonce { - t.lggr.Debugf("Pending nonce for txID: %v didn't increase. PendingNonce: %d, TxNonce: %d", tx.ID, pendingNonce, tx.Nonce) + t.lggr.Debugf("Pending nonce for txID: %v didn't increase. PendingNonce: %d, TxNonce: %d", tx.ID, pendingNonce, *tx.Nonce) return nil } } diff --git a/core/chains/evm/txm/txm_test.go b/core/chains/evm/txm/txm_test.go index 94710266104..fef90f9c344 100644 --- a/core/chains/evm/txm/txm_test.go +++ b/core/chains/evm/txm/txm_test.go @@ -28,21 +28,27 @@ func TestLifecycle(t *testing.T) { client := mocks.NewClient(t) ab := mocks.NewAttemptBuilder(t) - config := Config{BlockTime: 10 * time.Millisecond} address1 := testutils.NewAddress() address2 := testutils.NewAddress() assert.NotEqual(t, address1, address2) addresses := []common.Address{address1, address2} keystore := mocks.NewKeystore(t) - keystore.On("EnabledAddressesForChain", mock.Anything, mock.Anything).Return(addresses, nil) - t.Run("fails to start if initial pending nonce call fails", func(t *testing.T) { - txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, nil, nil, config, keystore) + t.Run("retries if initial pending nonce call fails", func(t *testing.T) { + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + config := Config{BlockTime: 1 * time.Minute} + txStore := storage.NewInMemoryStoreManager(lggr, testutils.FixtureChainID) + keystore.On("EnabledAddressesForChain", mock.Anything, mock.Anything).Return([]common.Address{address1}, nil).Once() + txm := NewTxm(lggr, testutils.FixtureChainID, client, nil, txStore, nil, config, keystore) client.On("PendingNonceAt", mock.Anything, address1).Return(uint64(0), errors.New("error")).Once() - require.Error(t, txm.Start(tests.Context(t))) + client.On("PendingNonceAt", mock.Anything, address1).Return(uint64(0), nil).Once() + require.NoError(t, txm.Start(tests.Context(t))) + tests.AssertLogEventually(t, observedLogs, "Error when fetching initial pending nonce") }) t.Run("tests lifecycle successfully without any transactions", func(t *testing.T) { + config := Config{BlockTime: 200 * time.Millisecond} + keystore.On("EnabledAddressesForChain", mock.Anything, mock.Anything).Return(addresses, nil).Once() lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) txStore := storage.NewInMemoryStoreManager(lggr, testutils.FixtureChainID) require.NoError(t, txStore.Add(addresses...)) @@ -52,8 +58,8 @@ func TestLifecycle(t *testing.T) { client.On("PendingNonceAt", mock.Anything, address1).Return(nonce, nil).Once() client.On("PendingNonceAt", mock.Anything, address2).Return(nonce, nil).Once() // backfill loop (may or may not be executed multiple times) - client.On("NonceAt", mock.Anything, address1, mock.Anything).Return(nonce, nil) - client.On("NonceAt", mock.Anything, address2, mock.Anything).Return(nonce, nil) + client.On("NonceAt", mock.Anything, address1, mock.Anything).Return(nonce, nil).Maybe() + client.On("NonceAt", mock.Anything, address2, mock.Anything).Return(nonce, nil).Maybe() servicetest.Run(t, txm) tests.AssertLogEventually(t, observedLogs, "Backfill time elapsed") @@ -85,6 +91,7 @@ func TestTrigger(t *testing.T) { // Start client.On("PendingNonceAt", mock.Anything, address).Return(nonce, nil).Once() servicetest.Run(t, txm) + txm.Trigger(address) }) } @@ -169,6 +176,7 @@ func TestBroadcastTransaction(t *testing.T) { txm.setNonce(address, 8) IDK := "IDK" txRequest := &types.TxRequest{ + Data: []byte{100}, IdempotencyKey: &IDK, ChainID: testutils.FixtureChainID, FromAddress: address, @@ -195,6 +203,7 @@ func TestBroadcastTransaction(t *testing.T) { var zeroTime time.Time assert.Greater(t, tx.LastBroadcastAt, zeroTime) assert.Greater(t, tx.Attempts[0].BroadcastAt, zeroTime) + assert.Greater(t, tx.InitialBroadcastAt, zeroTime) }) } diff --git a/core/chains/evm/txm/types/transaction.go b/core/chains/evm/txm/types/transaction.go index 1b0a91646b5..0c1d2861daa 100644 --- a/core/chains/evm/txm/types/transaction.go +++ b/core/chains/evm/txm/types/transaction.go @@ -69,7 +69,7 @@ func (t *Transaction) PrettyPrint() string { nonce = strconv.FormatUint(*t.Nonce, 10) } return fmt.Sprintf(`{txID:%d, IdempotencyKey:%v, ChainID:%v, Nonce:%s, FromAddress:%v, ToAddress:%v, Value:%v, `+ - `Data:%v, SpecifiedGasLimit:%d, CreatedAt:%v, InitialBroadcastAt:%v, LastBroadcastAt:%v, State:%v, IsPurgeable:%v, AttemptCount:%d, `+ + `Data:%s, SpecifiedGasLimit:%d, CreatedAt:%v, InitialBroadcastAt:%v, LastBroadcastAt:%v, State:%v, IsPurgeable:%v, AttemptCount:%d, `+ `Meta:%v, Subject:%v}`, t.ID, idk, t.ChainID, nonce, t.FromAddress, t.ToAddress, t.Value, t.Data, t.SpecifiedGasLimit, t.CreatedAt, t.InitialBroadcastAt, t.LastBroadcastAt, t.State, t.IsPurgeable, t.AttemptCount, t.Meta, t.Subject) diff --git a/core/web/resolver/testdata/config-multi-chain-effective.toml b/core/web/resolver/testdata/config-multi-chain-effective.toml index e26a3793d7a..f7e70e5945e 100644 --- a/core/web/resolver/testdata/config-multi-chain-effective.toml +++ b/core/web/resolver/testdata/config-multi-chain-effective.toml @@ -313,6 +313,9 @@ RPCBlockQueryDelay = 1 FinalizedBlockOffset = 0 NoNewFinalizedHeadsThreshold = '9m0s' +[EVM.TxmV2] +Enabled = false + [EVM.Transactions] ForwardersEnabled = false MaxInFlight = 16 @@ -321,9 +324,6 @@ ReaperInterval = '1h0m0s' ReaperThreshold = '168h0m0s' ResendAfterThreshold = '1m0s' -[EVM.TxmV2] -Enabled = false - [EVM.Transactions.AutoPurge] Enabled = false @@ -533,6 +533,9 @@ RPCBlockQueryDelay = 10 FinalizedBlockOffset = 0 NoNewFinalizedHeadsThreshold = '6m0s' +[EVM.TxmV2] +Enabled = false + [EVM.Transactions] ForwardersEnabled = false MaxInFlight = 16 @@ -541,9 +544,6 @@ ReaperInterval = '1h0m0s' ReaperThreshold = '168h0m0s' ResendAfterThreshold = '1m0s' -[EVM.TxmV2] -Enabled = false - [EVM.Transactions.AutoPurge] Enabled = false