Skip to content

Commit

Permalink
Fix CI
Browse files Browse the repository at this point in the history
  • Loading branch information
dimriou committed Dec 5, 2024
1 parent c6e1f2e commit 129c734
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 23 deletions.
39 changes: 30 additions & 9 deletions core/chains/evm/txm/txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
)

const (
broadcastInterval time.Duration = 30 * time.Second
maxInFlightTransactions int = 16
maxInFlightSubset int = 3
maxAllowedAttempts uint16 = 10
broadcastInterval time.Duration = 30 * time.Second
maxInFlightTransactions int = 16
maxInFlightSubset int = 3
maxAllowedAttempts uint16 = 10
pendingNonceDefaultTimeout time.Duration = 30 * time.Second
pendingNonceRecheckInterval time.Duration = 1 * time.Second
)

type Client interface {
Expand Down Expand Up @@ -72,7 +74,7 @@ var (
}, []string{"chainID"})
promNumConfirmedTxs = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "txm_num_confirmed_transactions",
Help: "Total number of confirmed transactions. Note that this can happen multiple times per transaction in the case of re-orgs.",
Help: "Total number of confirmed transactions. Note that this can happen multiple times per transaction in the case of re-orgs or when filling the nonce for untracked transactions.",
}, []string{"chainID"})
promNumNonceGaps = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "txm_num_nonce_gaps",
Expand Down Expand Up @@ -135,7 +137,7 @@ func (t *Txm) Start(ctx context.Context) error {
return err
}
for _, address := range addresses {
err := t.startAddress(address)
err := t.startAddress(ctx, address)
if err != nil {
return err
}
Expand All @@ -144,10 +146,10 @@ func (t *Txm) Start(ctx context.Context) error {
})
}

func (t *Txm) startAddress(address common.Address) error {
func (t *Txm) startAddress(ctx context.Context, address common.Address) error {
triggerCh := make(chan struct{}, 1)
t.triggerCh[address] = triggerCh
pendingNonce, err := t.client.PendingNonceAt(context.TODO(), address)
pendingNonce, err := t.pollForPendingNonce(ctx, address)
if err != nil {
return err
}
Expand All @@ -159,6 +161,24 @@ func (t *Txm) startAddress(address common.Address) error {
return nil
}

func (t *Txm) pollForPendingNonce(ctx context.Context, address common.Address) (pendingNonce uint64, err error) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, pendingNonceDefaultTimeout)
defer cancel()
for {
pendingNonce, err = t.client.PendingNonceAt(ctxWithTimeout, address)
if err != nil {
t.lggr.Errorw("Error when fetching initial pending nonce", "address", address, "err", err)
select {
case <-time.After(pendingNonceRecheckInterval):
case <-ctx.Done():
return 0, context.Cause(ctx)
}
continue
}
return pendingNonce, nil
}
}

func (t *Txm) Close() error {
return t.StopOnce("Txm", func() error {
close(t.stopCh)
Expand Down Expand Up @@ -188,6 +208,7 @@ func (t *Txm) Trigger(address common.Address) {
}

func (t *Txm) Abandon(address common.Address) error {
t.lggr.Infof("Dropping unstarted and unconfirmed transactions for address: %v", address)
return t.txStore.AbandonPendingTransactions(context.TODO(), address)
}

Expand Down Expand Up @@ -350,7 +371,7 @@ func (t *Txm) sendTransactionWithError(ctx context.Context, tx *types.Transactio
return err
}
if pendingNonce <= *tx.Nonce {
t.lggr.Debugf("Pending nonce for txID: %v didn't increase. PendingNonce: %d, TxNonce: %d", tx.ID, pendingNonce, tx.Nonce)
t.lggr.Debugf("Pending nonce for txID: %v didn't increase. PendingNonce: %d, TxNonce: %d", tx.ID, pendingNonce, *tx.Nonce)
return nil
}
}
Expand Down
23 changes: 16 additions & 7 deletions core/chains/evm/txm/txm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,27 @@ func TestLifecycle(t *testing.T) {

client := mocks.NewClient(t)
ab := mocks.NewAttemptBuilder(t)
config := Config{BlockTime: 10 * time.Millisecond}
address1 := testutils.NewAddress()
address2 := testutils.NewAddress()
assert.NotEqual(t, address1, address2)
addresses := []common.Address{address1, address2}
keystore := mocks.NewKeystore(t)
keystore.On("EnabledAddressesForChain", mock.Anything, mock.Anything).Return(addresses, nil)

t.Run("fails to start if initial pending nonce call fails", func(t *testing.T) {
txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, nil, nil, config, keystore)
t.Run("retries if initial pending nonce call fails", func(t *testing.T) {
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
config := Config{BlockTime: 1 * time.Minute}
txStore := storage.NewInMemoryStoreManager(lggr, testutils.FixtureChainID)
keystore.On("EnabledAddressesForChain", mock.Anything, mock.Anything).Return([]common.Address{address1}, nil).Once()
txm := NewTxm(lggr, testutils.FixtureChainID, client, nil, txStore, nil, config, keystore)
client.On("PendingNonceAt", mock.Anything, address1).Return(uint64(0), errors.New("error")).Once()
require.Error(t, txm.Start(tests.Context(t)))
client.On("PendingNonceAt", mock.Anything, address1).Return(uint64(0), nil).Once()
require.NoError(t, txm.Start(tests.Context(t)))
tests.AssertLogEventually(t, observedLogs, "Error when fetching initial pending nonce")
})

t.Run("tests lifecycle successfully without any transactions", func(t *testing.T) {
config := Config{BlockTime: 200 * time.Millisecond}
keystore.On("EnabledAddressesForChain", mock.Anything, mock.Anything).Return(addresses, nil).Once()
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
txStore := storage.NewInMemoryStoreManager(lggr, testutils.FixtureChainID)
require.NoError(t, txStore.Add(addresses...))
Expand All @@ -52,8 +58,8 @@ func TestLifecycle(t *testing.T) {
client.On("PendingNonceAt", mock.Anything, address1).Return(nonce, nil).Once()
client.On("PendingNonceAt", mock.Anything, address2).Return(nonce, nil).Once()
// backfill loop (may or may not be executed multiple times)
client.On("NonceAt", mock.Anything, address1, mock.Anything).Return(nonce, nil)
client.On("NonceAt", mock.Anything, address2, mock.Anything).Return(nonce, nil)
client.On("NonceAt", mock.Anything, address1, mock.Anything).Return(nonce, nil).Maybe()
client.On("NonceAt", mock.Anything, address2, mock.Anything).Return(nonce, nil).Maybe()

servicetest.Run(t, txm)
tests.AssertLogEventually(t, observedLogs, "Backfill time elapsed")
Expand Down Expand Up @@ -85,6 +91,7 @@ func TestTrigger(t *testing.T) {
// Start
client.On("PendingNonceAt", mock.Anything, address).Return(nonce, nil).Once()
servicetest.Run(t, txm)
txm.Trigger(address)
})
}

Expand Down Expand Up @@ -169,6 +176,7 @@ func TestBroadcastTransaction(t *testing.T) {
txm.setNonce(address, 8)
IDK := "IDK"
txRequest := &types.TxRequest{
Data: []byte{100},
IdempotencyKey: &IDK,
ChainID: testutils.FixtureChainID,
FromAddress: address,
Expand All @@ -195,6 +203,7 @@ func TestBroadcastTransaction(t *testing.T) {
var zeroTime time.Time
assert.Greater(t, tx.LastBroadcastAt, zeroTime)
assert.Greater(t, tx.Attempts[0].BroadcastAt, zeroTime)
assert.Greater(t, tx.InitialBroadcastAt, zeroTime)
})
}

Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/txm/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (t *Transaction) PrettyPrint() string {
nonce = strconv.FormatUint(*t.Nonce, 10)
}
return fmt.Sprintf(`{txID:%d, IdempotencyKey:%v, ChainID:%v, Nonce:%s, FromAddress:%v, ToAddress:%v, Value:%v, `+
`Data:%v, SpecifiedGasLimit:%d, CreatedAt:%v, InitialBroadcastAt:%v, LastBroadcastAt:%v, State:%v, IsPurgeable:%v, AttemptCount:%d, `+
`Data:%s, SpecifiedGasLimit:%d, CreatedAt:%v, InitialBroadcastAt:%v, LastBroadcastAt:%v, State:%v, IsPurgeable:%v, AttemptCount:%d, `+
`Meta:%v, Subject:%v}`,
t.ID, idk, t.ChainID, nonce, t.FromAddress, t.ToAddress, t.Value, t.Data, t.SpecifiedGasLimit, t.CreatedAt, t.InitialBroadcastAt,
t.LastBroadcastAt, t.State, t.IsPurgeable, t.AttemptCount, t.Meta, t.Subject)
Expand Down
12 changes: 6 additions & 6 deletions core/web/resolver/testdata/config-multi-chain-effective.toml
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ RPCBlockQueryDelay = 1
FinalizedBlockOffset = 0
NoNewFinalizedHeadsThreshold = '9m0s'

[EVM.TxmV2]
Enabled = false

[EVM.Transactions]
ForwardersEnabled = false
MaxInFlight = 16
Expand All @@ -321,9 +324,6 @@ ReaperInterval = '1h0m0s'
ReaperThreshold = '168h0m0s'
ResendAfterThreshold = '1m0s'

[EVM.TxmV2]
Enabled = false

[EVM.Transactions.AutoPurge]
Enabled = false

Expand Down Expand Up @@ -533,6 +533,9 @@ RPCBlockQueryDelay = 10
FinalizedBlockOffset = 0
NoNewFinalizedHeadsThreshold = '6m0s'

[EVM.TxmV2]
Enabled = false

[EVM.Transactions]
ForwardersEnabled = false
MaxInFlight = 16
Expand All @@ -541,9 +544,6 @@ ReaperInterval = '1h0m0s'
ReaperThreshold = '168h0m0s'
ResendAfterThreshold = '1m0s'

[EVM.TxmV2]
Enabled = false

[EVM.Transactions.AutoPurge]
Enabled = false

Expand Down

0 comments on commit 129c734

Please sign in to comment.