diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index 42003bf1de8..494c6220ea0 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -375,11 +375,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Pro // Mark transactions as unconfirmed, mark attempts as in-progress, and delete receipts since they do not apply to the new chain // This may revert some fatal error transactions to unconfirmed if terminally stuck transactions purge attempts get re-org'd - if err := ec.txStore.UpdateTxForRebroadcast(ctx, etxIDs, attemptIDs); err != nil { - return err - } - - return nil + return ec.txStore.UpdateTxForRebroadcast(ctx, etxIDs, attemptIDs) } func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ProcessIncludedTxs(ctx context.Context, includedTxs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], head types.Head[BLOCK_HASH]) error { diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 783192d8742..d22c1197704 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -2239,9 +2239,9 @@ func (_c *TxStore_UpdateTxAttemptInProgressToBroadcast_Call[ADDR, CHAIN_ID, TX_H return _c } -// UpdateTxCallbackCompleted provides a mock function with given fields: ctx, pipelineTaskRunRid, chainId -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error { - ret := _m.Called(ctx, pipelineTaskRunRid, chainId) +// UpdateTxCallbackCompleted provides a mock function with given fields: ctx, pipelineTaskRunRid, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainID CHAIN_ID) error { + ret := _m.Called(ctx, pipelineTaskRunRid, chainID) if len(ret) == 0 { panic("no return value specified for UpdateTxCallbackCompleted") @@ -2249,7 +2249,7 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxCal var r0 error if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, CHAIN_ID) error); ok { - r0 = rf(ctx, pipelineTaskRunRid, chainId) + r0 = rf(ctx, pipelineTaskRunRid, chainID) } else { r0 = ret.Error(0) } @@ -2265,12 +2265,12 @@ type TxStore_UpdateTxCallbackCompleted_Call[ADDR types.Hashable, CHAIN_ID types. // UpdateTxCallbackCompleted is a helper method to define mock.On call // - ctx context.Context // - pipelineTaskRunRid uuid.UUID -// - chainId CHAIN_ID -func (_e *TxStore_Expecter[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxCallbackCompleted(ctx interface{}, pipelineTaskRunRid interface{}, chainId interface{}) *TxStore_UpdateTxCallbackCompleted_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { - return &TxStore_UpdateTxCallbackCompleted_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{Call: _e.mock.On("UpdateTxCallbackCompleted", ctx, pipelineTaskRunRid, chainId)} +// - chainID CHAIN_ID +func (_e *TxStore_Expecter[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxCallbackCompleted(ctx interface{}, pipelineTaskRunRid interface{}, chainID interface{}) *TxStore_UpdateTxCallbackCompleted_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { + return &TxStore_UpdateTxCallbackCompleted_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{Call: _e.mock.On("UpdateTxCallbackCompleted", ctx, pipelineTaskRunRid, chainID)} } -func (_c *TxStore_UpdateTxCallbackCompleted_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Run(run func(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID)) *TxStore_UpdateTxCallbackCompleted_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { +func (_c *TxStore_UpdateTxCallbackCompleted_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Run(run func(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainID CHAIN_ID)) *TxStore_UpdateTxCallbackCompleted_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].(uuid.UUID), args[2].(CHAIN_ID)) }) diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 026049954b5..efef15d7eb9 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -93,7 +93,7 @@ type TransactionStore[ UpdateBroadcastAts(ctx context.Context, now time.Time, etxIDs []int64) error UpdateTxAttemptInProgressToBroadcast(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], NewAttemptState TxAttemptState) error // UpdateTxCallbackCompleted updates tx to mark that its callback has been signaled - UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error + UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainID CHAIN_ID) error // UpdateTxConfirmed updates transaction states to confirmed UpdateTxConfirmed(ctx context.Context, etxIDs []int64) error // UpdateTxFatalErrorAndDeleteAttempts updates transaction states to fatal error, deletes attempts, and clears broadcast info and sequence diff --git a/core/chains/evm/txmgr/confirmer_test.go b/core/chains/evm/txmgr/confirmer_test.go index 74aeeee06b8..69f2973494d 100644 --- a/core/chains/evm/txmgr/confirmer_test.go +++ b/core/chains/evm/txmgr/confirmer_test.go @@ -1265,7 +1265,8 @@ func TestEthConfirmer_RebroadcastWhereNecessary(t *testing.T) { ethKeyStore := cltest.NewKeyStore(t, db).Eth() _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) etx := mustInsertUnconfirmedEthTxWithBroadcastDynamicFeeAttempt(t, txStore, 0, fromAddress) - txStore.UpdateTxAttemptBroadcastBeforeBlockNum(ctx, etx.ID, uint(25)) + err := txStore.UpdateTxAttemptBroadcastBeforeBlockNum(ctx, etx.ID, uint(25)) + require.NoError(t, err) ec := newEthConfirmer(t, txStore, ethClient, cfg, newCfg, ethKeyStore, nil) ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { @@ -1277,7 +1278,6 @@ func TestEthConfirmer_RebroadcastWhereNecessary(t *testing.T) { // Do it require.NoError(t, ec.RebroadcastWhereNecessary(ctx, currentHead)) - var err error etx, err = txStore.FindTxWithAttempts(ctx, etx.ID) require.NoError(t, err) assert.Equal(t, txmgrcommon.TxUnconfirmed, etx.State) diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 0a748aee28b..88698307bb0 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -48,7 +48,7 @@ type EvmTxStore interface { FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) FindTxesByIDs(ctx context.Context, etxIDs []int64, chainID *big.Int) (etxs []*Tx, err error) SaveFetchedReceipts(ctx context.Context, r []*evmtypes.Receipt, chainID *big.Int) (err error) - UpdateTxStatesToFinalizedUsingTxHashes(ctx context.Context, txHashes []common.Hash, chainId *big.Int) error + UpdateTxStatesToFinalizedUsingTxHashes(ctx context.Context, txHashes []common.Hash, chainID *big.Int) error } // TxStoreWebApi encapsulates the methods that are not used by the txmgr and only used by the various web controllers, readers, or evm specific components @@ -996,23 +996,23 @@ func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64 } // Update tx to mark that its callback has been signaled -func (o *evmTxStore) UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunId uuid.UUID, chainId *big.Int) error { +func (o *evmTxStore) UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunId uuid.UUID, chainID *big.Int) error { var cancel context.CancelFunc ctx, cancel = o.stopCh.Ctx(ctx) defer cancel() - _, err := o.q.ExecContext(ctx, `UPDATE evm.txes SET callback_completed = TRUE WHERE pipeline_task_run_id = $1 AND evm_chain_id = $2`, pipelineTaskRunId, chainId.String()) + _, err := o.q.ExecContext(ctx, `UPDATE evm.txes SET callback_completed = TRUE WHERE pipeline_task_run_id = $1 AND evm_chain_id = $2`, pipelineTaskRunId, chainID.String()) if err != nil { return fmt.Errorf("failed to mark callback completed for transaction: %w", err) } return nil } -func (o *evmTxStore) FindLatestSequence(ctx context.Context, fromAddress common.Address, chainId *big.Int) (nonce evmtypes.Nonce, err error) { +func (o *evmTxStore) FindLatestSequence(ctx context.Context, fromAddress common.Address, chainID *big.Int) (nonce evmtypes.Nonce, err error) { var cancel context.CancelFunc ctx, cancel = o.stopCh.Ctx(ctx) defer cancel() sql := `SELECT nonce FROM evm.txes WHERE from_address = $1 AND evm_chain_id = $2 AND nonce IS NOT NULL ORDER BY nonce DESC LIMIT 1` - err = o.q.GetContext(ctx, &nonce, sql, fromAddress, chainId.String()) + err = o.q.GetContext(ctx, &nonce, sql, fromAddress, chainID.String()) return } @@ -1944,7 +1944,7 @@ func (o *evmTxStore) FindConfirmedTxesReceipts(ctx context.Context, finalizedBlo } // Mark transactions corresponding to attempt hashes as finalized -func (o *evmTxStore) UpdateTxStatesToFinalizedUsingTxHashes(ctx context.Context, txHashes []common.Hash, chainId *big.Int) error { +func (o *evmTxStore) UpdateTxStatesToFinalizedUsingTxHashes(ctx context.Context, txHashes []common.Hash, chainID *big.Int) error { if len(txHashes) == 0 { return nil } @@ -1960,21 +1960,21 @@ UPDATE evm.txes SET state = 'finalized' WHERE evm.txes.evm_chain_id = $1 AND evm INNER JOIN evm.tx_attempts ON evm.tx_attempts.eth_tx_id = evm.txes.id WHERE evm.tx_attempts.hash = ANY($2)) ` - _, err := o.q.ExecContext(ctx, sql, chainId.String(), txHashBytea) + _, err := o.q.ExecContext(ctx, sql, chainID.String(), txHashBytea) return err } // FindReorgOrIncludedTxs finds transactions that have either been re-org'd or included on-chain based on the mined transaction count // If the mined transaction count receeds, transactions could have beeen re-org'd // If it proceeds, transactions could have been included -func (o *evmTxStore) FindReorgOrIncludedTxs(ctx context.Context, fromAddress common.Address, minedTxCount evmtypes.Nonce, chainId *big.Int) (reorgTxs []*Tx, includedTxs []*Tx, err error) { +func (o *evmTxStore) FindReorgOrIncludedTxs(ctx context.Context, fromAddress common.Address, minedTxCount evmtypes.Nonce, chainID *big.Int) (reorgTxs []*Tx, includedTxs []*Tx, err error) { var cancel context.CancelFunc ctx, cancel = o.stopCh.Ctx(ctx) defer cancel() err = o.Transact(ctx, true, func(orm *evmTxStore) error { var dbReOrgEtxs []DbEthTx query := `SELECT * FROM evm.txes WHERE from_address = $1 AND state IN ('confirmed', 'confirmed_missing_receipt', 'fatal_error', 'finalized') AND nonce >= $2 AND evm_chain_id = $3` - err = o.q.SelectContext(ctx, &dbReOrgEtxs, query, fromAddress, minedTxCount.Int64(), chainId.String()) + err = o.q.SelectContext(ctx, &dbReOrgEtxs, query, fromAddress, minedTxCount.Int64(), chainID.String()) // If re-org'd transactions found, populate them with attempts and partial receipts, then return since new transactions could not have been included if len(dbReOrgEtxs) > 0 { reorgTxs = make([]*Tx, len(dbReOrgEtxs)) @@ -1991,7 +1991,7 @@ func (o *evmTxStore) FindReorgOrIncludedTxs(ctx context.Context, fromAddress com // If re-org'd transactions not found, find unconfirmed transactions could have been included and populate with attempts var dbIncludedEtxs []DbEthTx query = `SELECT * FROM evm.txes WHERE state = 'unconfirmed' AND from_address = $1 AND nonce < $2 AND evm_chain_id = $3` - err = o.q.SelectContext(ctx, &dbIncludedEtxs, query, fromAddress, minedTxCount.Int64(), chainId.String()) + err = o.q.SelectContext(ctx, &dbIncludedEtxs, query, fromAddress, minedTxCount.Int64(), chainID.String()) includedTxs = make([]*Tx, len(dbIncludedEtxs)) dbEthTxsToEvmEthTxPtrs(dbIncludedEtxs, includedTxs) if err = orm.LoadTxesAttempts(ctx, includedTxs); err != nil { diff --git a/core/chains/evm/txmgr/finalizer.go b/core/chains/evm/txmgr/finalizer.go index 3591acd9ac6..3fa70180ce1 100644 --- a/core/chains/evm/txmgr/finalizer.go +++ b/core/chains/evm/txmgr/finalizer.go @@ -62,9 +62,9 @@ type finalizerTxStore interface { FindTxesByIDs(ctx context.Context, etxIDs []int64, chainID *big.Int) (etxs []*Tx, err error) PreloadTxes(ctx context.Context, attempts []TxAttempt) error SaveFetchedReceipts(ctx context.Context, r []*evmtypes.Receipt, chainID *big.Int) (err error) - UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunId uuid.UUID, chainId *big.Int) error + UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunID uuid.UUID, chainID *big.Int) error UpdateTxFatalError(ctx context.Context, etxIDs []int64, errMsg string) error - UpdateTxStatesToFinalizedUsingTxHashes(ctx context.Context, txHashes []common.Hash, chainId *big.Int) error + UpdateTxStatesToFinalizedUsingTxHashes(ctx context.Context, txHashes []common.Hash, chainID *big.Int) error } type finalizerChainClient interface { @@ -83,7 +83,7 @@ type resumeCallback = func(context.Context, uuid.UUID, interface{}, error) error type evmFinalizer struct { services.StateMachine lggr logger.SugaredLogger - chainId *big.Int + chainID *big.Int rpcBatchSize int forwardersEnabled bool @@ -101,7 +101,7 @@ type evmFinalizer struct { func NewEvmFinalizer( lggr logger.Logger, - chainId *big.Int, + chainID *big.Int, rpcBatchSize uint32, forwardersEnabled bool, txStore finalizerTxStore, @@ -111,7 +111,7 @@ func NewEvmFinalizer( lggr = logger.Named(lggr, "Finalizer") return &evmFinalizer{ lggr: logger.Sugared(lggr), - chainId: chainId, + chainID: chainID, rpcBatchSize: int(rpcBatchSize), forwardersEnabled: forwardersEnabled, txStore: txStore, @@ -198,19 +198,19 @@ func (f *evmFinalizer) ProcessHead(ctx context.Context, head *evmtypes.Head) err err = f.FetchAndStoreReceipts(ctx, head, latestFinalizedHead) // Do not return on error since other functions are not dependent on results if err != nil { - f.lggr.Errorf("failed to fetch and store receipts for confirmed transactions: %w", err) + f.lggr.Errorf("failed to fetch and store receipts for confirmed transactions: %s", err.Error()) } // Resume pending task runs if any receipts match the min confirmation criteria err = f.ResumePendingTaskRuns(ctx, head) // Do not return on error since other functions are not dependent on results if err != nil { - f.lggr.Errorf("failed to resume pending task runs: %w", err) + f.lggr.Errorf("failed to resume pending task runs: %s", err.Error()) } return f.processFinalizedHead(ctx, latestFinalizedHead) } // processFinalizedHead determines if any confirmed transactions can be marked as finalized by comparing their receipts against the latest finalized block -// Fetches receipts direclty from on-chain so re-org detection is not needed during finalization +// Fetches receipts directly from on-chain so re-org detection is not needed during finalization func (f *evmFinalizer) processFinalizedHead(ctx context.Context, latestFinalizedHead *evmtypes.Head) error { // Cannot determine finality without a finalized head for comparison if latestFinalizedHead == nil || !latestFinalizedHead.IsValid() { @@ -231,7 +231,7 @@ func (f *evmFinalizer) processFinalizedHead(ctx context.Context, latestFinalized mark := time.Now() // Retrieve all confirmed transactions with receipts older than or equal to the finalized block - unfinalizedReceipts, err := f.txStore.FindConfirmedTxesReceipts(ctx, latestFinalizedHead.BlockNumber(), f.chainId) + unfinalizedReceipts, err := f.txStore.FindConfirmedTxesReceipts(ctx, latestFinalizedHead.BlockNumber(), f.chainID) if err != nil { return fmt.Errorf("failed to retrieve receipts for confirmed, unfinalized transactions: %w", err) } @@ -279,7 +279,7 @@ func (f *evmFinalizer) processFinalizedHead(ctx context.Context, latestFinalized txHashes := f.buildTxHashList(finalizedReceipts) mark = time.Now() - err = f.txStore.UpdateTxStatesToFinalizedUsingTxHashes(ctx, txHashes, f.chainId) + err = f.txStore.UpdateTxStatesToFinalizedUsingTxHashes(ctx, txHashes, f.chainID) if err != nil { return fmt.Errorf("failed to update transactions as finalized: %w", err) } @@ -352,14 +352,14 @@ func (f *evmFinalizer) batchCheckReceiptHashesOnchain(ctx context.Context, block } func (f *evmFinalizer) FetchAndStoreReceipts(ctx context.Context, head, latestFinalizedHead *evmtypes.Head) error { - attempts, err := f.txStore.FindAttemptsRequiringReceiptFetch(ctx, f.chainId) + attempts, err := f.txStore.FindAttemptsRequiringReceiptFetch(ctx, f.chainID) if err != nil { return fmt.Errorf("failed to fetch broadcasted attempts for confirmed transactions: %w", err) } if len(attempts) == 0 { return nil } - promTxAttemptCount.WithLabelValues(f.chainId.String()).Set(float64(len(attempts))) + promTxAttemptCount.WithLabelValues(f.chainID.String()).Set(float64(len(attempts))) f.lggr.Debugw(fmt.Sprintf("Fetching receipts for %v transaction attempts", len(attempts))) @@ -376,15 +376,15 @@ func (f *evmFinalizer) FetchAndStoreReceipts(ctx context.Context, head, latestFi } batch := attempts[i:j] - receipts, err := f.batchFetchReceipts(ctx, batch) - if err != nil { - errorList = append(errorList, err) + receipts, fetchErr := f.batchFetchReceipts(ctx, batch) + if fetchErr != nil { + errorList = append(errorList, fetchErr) continue } allReceipts = append(allReceipts, receipts...) - if err := f.txStore.SaveFetchedReceipts(ctx, receipts, f.chainId); err != nil { + if err := f.txStore.SaveFetchedReceipts(ctx, receipts, f.chainID); err != nil { errorList = append(errorList, err) continue } @@ -404,7 +404,6 @@ func (f *evmFinalizer) FetchAndStoreReceipts(ctx context.Context, head, latestFi } func (f *evmFinalizer) batchFetchReceipts(ctx context.Context, attempts []TxAttempt) (receipts []*evmtypes.Receipt, err error) { - // Metadata is required to determine whether a tx is forwarded or not. if f.forwardersEnabled { err = f.txStore.PreloadTxes(ctx, attempts) @@ -484,9 +483,9 @@ func (f *evmFinalizer) validateReceipt(ctx context.Context, receipt *evmtypes.Re } } // This might increment more than once e.g. in case of re-orgs going back and forth we might re-fetch the same receipt - promRevertedTxCount.WithLabelValues(f.chainId.String()).Add(1) + promRevertedTxCount.WithLabelValues(f.chainID.String()).Add(1) } else { - promNumSuccessfulTxs.WithLabelValues(f.chainId.String()).Add(1) + promNumSuccessfulTxs.WithLabelValues(f.chainID.String()).Add(1) } // This is only recording forwarded tx that were mined and have a status. @@ -494,8 +493,8 @@ func (f *evmFinalizer) validateReceipt(ctx context.Context, receipt *evmtypes.Re if f.forwardersEnabled { meta, metaErr := attempt.Tx.GetMeta() if metaErr == nil && meta != nil && meta.FwdrDestAddress != nil { - // promFwdTxCount takes two labels, chainId and a boolean of whether a tx was successful or not. - promFwdTxCount.WithLabelValues(f.chainId.String(), strconv.FormatBool(receipt.GetStatus() != 0)).Add(1) + // promFwdTxCount takes two labels, chainID and a boolean of whether a tx was successful or not. + promFwdTxCount.WithLabelValues(f.chainID.String(), strconv.FormatBool(receipt.GetStatus() != 0)).Add(1) } } return true @@ -506,7 +505,7 @@ func (f *evmFinalizer) ResumePendingTaskRuns(ctx context.Context, head *evmtypes if f.resumeCallback == nil { return nil } - receiptsPlus, err := f.txStore.FindTxesPendingCallback(ctx, head.BlockNumber(), f.chainId) + receiptsPlus, err := f.txStore.FindTxesPendingCallback(ctx, head.BlockNumber(), f.chainID) if err != nil { return err @@ -531,7 +530,7 @@ func (f *evmFinalizer) ResumePendingTaskRuns(ctx context.Context, head *evmtypes return fmt.Errorf("failed to resume suspended pipeline run: %w", err) } // Mark tx as having completed callback - if err := f.txStore.UpdateTxCallbackCompleted(ctx, data.ID, f.chainId); err != nil { + if err := f.txStore.UpdateTxCallbackCompleted(ctx, data.ID, f.chainID); err != nil { return err } } @@ -543,7 +542,7 @@ func (f *evmFinalizer) ProcessOldTxsWithoutReceipts(ctx context.Context, oldTxID if len(oldTxIDs) == 0 { return nil } - oldTxs, err := f.txStore.FindTxesByIDs(ctx, oldTxIDs, f.chainId) + oldTxs, err := f.txStore.FindTxesByIDs(ctx, oldTxIDs, f.chainID) if err != nil { return fmt.Errorf("failed to find transactions with IDs: %w", err) } @@ -561,14 +560,14 @@ func (f *evmFinalizer) ProcessOldTxsWithoutReceipts(ctx context.Context, oldTxID // Signal pending tasks for these transactions as failed // Store errors and continue to allow all transactions a chance to be signaled if f.resumeCallback != nil && oldTx.PipelineTaskRunID.Valid && oldTx.SignalCallback && !oldTx.CallbackCompleted { - err := f.resumeCallback(ctx, oldTx.PipelineTaskRunID.UUID, nil, errors.New(ErrCouldNotGetReceipt)) + err = f.resumeCallback(ctx, oldTx.PipelineTaskRunID.UUID, nil, errors.New(ErrCouldNotGetReceipt)) if errors.Is(err, sql.ErrNoRows) { f.lggr.Debugw("callback missing or already resumed", "etxID", oldTx.ID) } else if err != nil { errorList = append(errorList, fmt.Errorf("failed to resume pipeline for ID %s: %w", oldTx.PipelineTaskRunID.UUID.String(), err)) } else { // Mark tx as having completed callback - if err := f.txStore.UpdateTxCallbackCompleted(ctx, oldTx.PipelineTaskRunID.UUID, f.chainId); err != nil { + if err := f.txStore.UpdateTxCallbackCompleted(ctx, oldTx.PipelineTaskRunID.UUID, f.chainID); err != nil { errorList = append(errorList, fmt.Errorf("failed to update callback as complete for tx ID %d: %w", oldTx.ID, err)) } } @@ -592,7 +591,7 @@ func findOldTxIDsWithoutReceipts(attempts []TxAttempt, receipts []*evmtypes.Rece if len(attempts) == 0 { return nil } - txIdToAttemptsMap := make(map[int64][]TxAttempt) + txIDToAttemptsMap := make(map[int64][]TxAttempt) hashToReceiptMap := make(map[common.Hash]bool) // Store all receipts hashes in a map to easily access which attempt hash has a receipt for _, receipt := range receipts { @@ -600,12 +599,12 @@ func findOldTxIDsWithoutReceipts(attempts []TxAttempt, receipts []*evmtypes.Rece } // Store all attempts in a map of tx ID to attempts for _, attempt := range attempts { - txIdToAttemptsMap[attempt.TxID] = append(txIdToAttemptsMap[attempt.TxID], attempt) + txIDToAttemptsMap[attempt.TxID] = append(txIDToAttemptsMap[attempt.TxID], attempt) } // Determine which transactions still do not have a receipt and if all of their attempts are older or equal to the latest finalized head var oldTxIDs []int64 - for txID, attempts := range txIdToAttemptsMap { + for txID, attempts := range txIDToAttemptsMap { hasReceipt := false hasAttemptAfterFinalizedHead := false for _, attempt := range attempts { diff --git a/core/chains/evm/txmgr/finalizer_test.go b/core/chains/evm/txmgr/finalizer_test.go index d3747f0fc2f..940d52ba1c7 100644 --- a/core/chains/evm/txmgr/finalizer_test.go +++ b/core/chains/evm/txmgr/finalizer_test.go @@ -15,7 +15,6 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/test-go/testify/assert" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" @@ -349,19 +348,17 @@ func TestFinalizer_ResumePendingRuns(t *testing.T) { go func() { defer close(done) err2 := finalizer.ResumePendingTaskRuns(ctx, &head) - if !assert.NoError(t, err2) { - return - } + require.NoError(t, err2) + // Retrieve Tx to check if callback completed flag was set to true updateTx, err3 := txStore.FindTxWithSequence(ctx, fromAddress, nonce) - if assert.NoError(t, err3) { - assert.Equal(t, true, updateTx.CallbackCompleted) - } + require.NoError(t, err3) + require.Equal(t, true, updateTx.CallbackCompleted) }() select { case data := <-ch: - assert.NoError(t, err) + require.NoError(t, err) require.IsType(t, &evmtypes.Receipt{}, data) r := data.(*evmtypes.Receipt) @@ -406,23 +403,21 @@ func TestFinalizer_ResumePendingRuns(t *testing.T) { go func() { defer close(done) err2 := finalizer.ResumePendingTaskRuns(ctx, &head) - if !assert.NoError(t, err2) { - return - } + require.NoError(t, err2) + // Retrieve Tx to check if callback completed flag was set to true updateTx, err3 := txStore.FindTxWithSequence(ctx, fromAddress, nonce) - if assert.NoError(t, err3) { - assert.Equal(t, true, updateTx.CallbackCompleted) - } + require.NoError(t, err3) + require.Equal(t, true, updateTx.CallbackCompleted) }() select { case data := <-ch: - assert.Error(t, data.error) + require.Error(t, data.error) - assert.EqualError(t, data.error, fmt.Sprintf("transaction %s reverted on-chain", etx.TxAttempts[0].Hash.String())) + require.EqualError(t, data.error, fmt.Sprintf("transaction %s reverted on-chain", etx.TxAttempts[0].Hash.String())) - assert.Nil(t, data.value) + require.Nil(t, data.value) case <-time.After(tests.WaitTimeout(t)): t.Fatal("no value received") diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index 190cd75c811..528859eb778 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -3025,9 +3025,9 @@ func (_c *EvmTxStore_UpdateTxAttemptInProgressToBroadcast_Call) RunAndReturn(run return _c } -// UpdateTxCallbackCompleted provides a mock function with given fields: ctx, pipelineTaskRunRid, chainId -func (_m *EvmTxStore) UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId *big.Int) error { - ret := _m.Called(ctx, pipelineTaskRunRid, chainId) +// UpdateTxCallbackCompleted provides a mock function with given fields: ctx, pipelineTaskRunRid, chainID +func (_m *EvmTxStore) UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainID *big.Int) error { + ret := _m.Called(ctx, pipelineTaskRunRid, chainID) if len(ret) == 0 { panic("no return value specified for UpdateTxCallbackCompleted") @@ -3035,7 +3035,7 @@ func (_m *EvmTxStore) UpdateTxCallbackCompleted(ctx context.Context, pipelineTas var r0 error if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, *big.Int) error); ok { - r0 = rf(ctx, pipelineTaskRunRid, chainId) + r0 = rf(ctx, pipelineTaskRunRid, chainID) } else { r0 = ret.Error(0) } @@ -3051,12 +3051,12 @@ type EvmTxStore_UpdateTxCallbackCompleted_Call struct { // UpdateTxCallbackCompleted is a helper method to define mock.On call // - ctx context.Context // - pipelineTaskRunRid uuid.UUID -// - chainId *big.Int -func (_e *EvmTxStore_Expecter) UpdateTxCallbackCompleted(ctx interface{}, pipelineTaskRunRid interface{}, chainId interface{}) *EvmTxStore_UpdateTxCallbackCompleted_Call { - return &EvmTxStore_UpdateTxCallbackCompleted_Call{Call: _e.mock.On("UpdateTxCallbackCompleted", ctx, pipelineTaskRunRid, chainId)} +// - chainID *big.Int +func (_e *EvmTxStore_Expecter) UpdateTxCallbackCompleted(ctx interface{}, pipelineTaskRunRid interface{}, chainID interface{}) *EvmTxStore_UpdateTxCallbackCompleted_Call { + return &EvmTxStore_UpdateTxCallbackCompleted_Call{Call: _e.mock.On("UpdateTxCallbackCompleted", ctx, pipelineTaskRunRid, chainID)} } -func (_c *EvmTxStore_UpdateTxCallbackCompleted_Call) Run(run func(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId *big.Int)) *EvmTxStore_UpdateTxCallbackCompleted_Call { +func (_c *EvmTxStore_UpdateTxCallbackCompleted_Call) Run(run func(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainID *big.Int)) *EvmTxStore_UpdateTxCallbackCompleted_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].(uuid.UUID), args[2].(*big.Int)) }) @@ -3263,9 +3263,9 @@ func (_c *EvmTxStore_UpdateTxForRebroadcast_Call) RunAndReturn(run func(context. return _c } -// UpdateTxStatesToFinalizedUsingTxHashes provides a mock function with given fields: ctx, txHashes, chainId -func (_m *EvmTxStore) UpdateTxStatesToFinalizedUsingTxHashes(ctx context.Context, txHashes []common.Hash, chainId *big.Int) error { - ret := _m.Called(ctx, txHashes, chainId) +// UpdateTxStatesToFinalizedUsingTxHashes provides a mock function with given fields: ctx, txHashes, chainID +func (_m *EvmTxStore) UpdateTxStatesToFinalizedUsingTxHashes(ctx context.Context, txHashes []common.Hash, chainID *big.Int) error { + ret := _m.Called(ctx, txHashes, chainID) if len(ret) == 0 { panic("no return value specified for UpdateTxStatesToFinalizedUsingTxHashes") @@ -3273,7 +3273,7 @@ func (_m *EvmTxStore) UpdateTxStatesToFinalizedUsingTxHashes(ctx context.Context var r0 error if rf, ok := ret.Get(0).(func(context.Context, []common.Hash, *big.Int) error); ok { - r0 = rf(ctx, txHashes, chainId) + r0 = rf(ctx, txHashes, chainID) } else { r0 = ret.Error(0) } @@ -3289,12 +3289,12 @@ type EvmTxStore_UpdateTxStatesToFinalizedUsingTxHashes_Call struct { // UpdateTxStatesToFinalizedUsingTxHashes is a helper method to define mock.On call // - ctx context.Context // - txHashes []common.Hash -// - chainId *big.Int -func (_e *EvmTxStore_Expecter) UpdateTxStatesToFinalizedUsingTxHashes(ctx interface{}, txHashes interface{}, chainId interface{}) *EvmTxStore_UpdateTxStatesToFinalizedUsingTxHashes_Call { - return &EvmTxStore_UpdateTxStatesToFinalizedUsingTxHashes_Call{Call: _e.mock.On("UpdateTxStatesToFinalizedUsingTxHashes", ctx, txHashes, chainId)} +// - chainID *big.Int +func (_e *EvmTxStore_Expecter) UpdateTxStatesToFinalizedUsingTxHashes(ctx interface{}, txHashes interface{}, chainID interface{}) *EvmTxStore_UpdateTxStatesToFinalizedUsingTxHashes_Call { + return &EvmTxStore_UpdateTxStatesToFinalizedUsingTxHashes_Call{Call: _e.mock.On("UpdateTxStatesToFinalizedUsingTxHashes", ctx, txHashes, chainID)} } -func (_c *EvmTxStore_UpdateTxStatesToFinalizedUsingTxHashes_Call) Run(run func(ctx context.Context, txHashes []common.Hash, chainId *big.Int)) *EvmTxStore_UpdateTxStatesToFinalizedUsingTxHashes_Call { +func (_c *EvmTxStore_UpdateTxStatesToFinalizedUsingTxHashes_Call) Run(run func(ctx context.Context, txHashes []common.Hash, chainID *big.Int)) *EvmTxStore_UpdateTxStatesToFinalizedUsingTxHashes_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].([]common.Hash), args[2].(*big.Int)) })