From bc4d56c72161f4f2131c7c0bf1d9634f60049dee Mon Sep 17 00:00:00 2001 From: Silas Lenihan Date: Thu, 19 Dec 2024 13:15:15 -0500 Subject: [PATCH] solved conflicts between TXM changes and unit test changes --- pkg/solana/chain.go | 2 +- pkg/solana/chainwriter/chain_writer.go | 2 +- pkg/solana/chainwriter/chain_writer_test.go | 2 +- pkg/solana/relay.go | 2 +- pkg/solana/transmitter_test.go | 2 +- pkg/solana/txm/mocks/tx_manager.go | 25 +++++++++--------- pkg/solana/txm/pendingtx.go | 7 +++--- pkg/solana/txm/pendingtx_test.go | 28 ++++++++++----------- pkg/solana/txm/txm.go | 24 +++++++++--------- pkg/solana/txm/txm_internal_test.go | 4 +-- 10 files changed, 50 insertions(+), 48 deletions(-) diff --git a/pkg/solana/chain.go b/pkg/solana/chain.go index 722f99bf5..0ccdf0e46 100644 --- a/pkg/solana/chain.go +++ b/pkg/solana/chain.go @@ -577,7 +577,7 @@ func (c *chain) sendTx(ctx context.Context, from, to string, amount *big.Int, ba chainTxm := c.TxManager() err = chainTxm.Enqueue(ctx, "", tx, nil, blockhash.Value.LastValidBlockHeight, - txm.SetComputeUnitLimit(500), // reduce from default 200K limit - should only take 450 compute units + txmutils.SetComputeUnitLimit(500), // reduce from default 200K limit - should only take 450 compute units // no fee bumping and no additional fee - makes validating balance accurate txmutils.SetComputeUnitPriceMax(0), txmutils.SetComputeUnitPriceMin(0), diff --git a/pkg/solana/chainwriter/chain_writer.go b/pkg/solana/chainwriter/chain_writer.go index e07842201..8616b9f62 100644 --- a/pkg/solana/chainwriter/chain_writer.go +++ b/pkg/solana/chainwriter/chain_writer.go @@ -274,7 +274,7 @@ func (s *SolanaChainWriterService) SubmitTransaction(ctx context.Context, contra } // Enqueue transaction - if err = s.txm.Enqueue(ctx, accounts[0].PublicKey.String(), tx, &transactionID); err != nil { + if err = s.txm.Enqueue(ctx, accounts[0].PublicKey.String(), tx, &transactionID, blockhash.Value.LastValidBlockHeight); err != nil { return errorWithDebugID(fmt.Errorf("error enqueuing transaction: %w", err), debugID) } diff --git a/pkg/solana/chainwriter/chain_writer_test.go b/pkg/solana/chainwriter/chain_writer_test.go index c7fab54e0..ef7b399af 100644 --- a/pkg/solana/chainwriter/chain_writer_test.go +++ b/pkg/solana/chainwriter/chain_writer_test.go @@ -541,7 +541,7 @@ func TestChainWriter_SubmitTransaction(t *testing.T) { require.Len(t, tx.Message.AddressTableLookups, 1) // address table look contains entry require.Equal(t, derivedLookupTablePubkey, tx.Message.AddressTableLookups[0].AccountKey) // address table return true - }), &txID).Return(nil).Once() + }), &txID, mock.Anything).Return(nil).Once() args := map[string]interface{}{ "lookupTable": chainwriter.GetRandomPubKey(t).Bytes(), diff --git a/pkg/solana/relay.go b/pkg/solana/relay.go index 7b5c3b39f..5dfe8c836 100644 --- a/pkg/solana/relay.go +++ b/pkg/solana/relay.go @@ -33,7 +33,7 @@ type TxManager interface { // - txCfgs can be used to set custom tx configurations. // - If a txID is provided, it will be used to identify the tx. Otherwise, a random UUID will be generated. // - The caller needs to set the tx.Message.RecentBlockhash and provide the corresponding lastValidBlockHeight. These values are obtained from the GetLatestBlockhash RPC call. - Enqueue(ctx context.Context, accountID string, tx *solana.Transaction, txID *string, lastValidBlockHeight uint64, txCfgs ...txm.SetTxConfig) error + Enqueue(ctx context.Context, accountID string, tx *solana.Transaction, txID *string, lastValidBlockHeight uint64, txCfgs ...txmutils.SetTxConfig) error } var _ relaytypes.Relayer = &Relayer{} //nolint:staticcheck diff --git a/pkg/solana/transmitter_test.go b/pkg/solana/transmitter_test.go index 71d9d7bc3..ba1ec9cc3 100644 --- a/pkg/solana/transmitter_test.go +++ b/pkg/solana/transmitter_test.go @@ -27,7 +27,7 @@ type verifyTxSize struct { s *solana.PrivateKey } -func (txm verifyTxSize) Enqueue(_ context.Context, _ string, tx *solana.Transaction, txID *string, _ uint64, _ ...txm.SetTxConfig) error { +func (txm verifyTxSize) Enqueue(_ context.Context, _ string, tx *solana.Transaction, txID *string, _ uint64, _ ...txmutils.SetTxConfig) error { // additional components that transaction manager adds to the transaction require.NoError(txm.t, fees.SetComputeUnitPrice(tx, 0)) require.NoError(txm.t, fees.SetComputeUnitLimit(tx, 0)) diff --git a/pkg/solana/txm/mocks/tx_manager.go b/pkg/solana/txm/mocks/tx_manager.go index 50806a4da..7694703a3 100644 --- a/pkg/solana/txm/mocks/tx_manager.go +++ b/pkg/solana/txm/mocks/tx_manager.go @@ -71,14 +71,14 @@ func (_c *TxManager_Close_Call) RunAndReturn(run func() error) *TxManager_Close_ return _c } -// Enqueue provides a mock function with given fields: ctx, accountID, tx, txID, txCfgs -func (_m *TxManager) Enqueue(ctx context.Context, accountID string, tx *solana.Transaction, txID *string, txCfgs ...utils.SetTxConfig) error { +// Enqueue provides a mock function with given fields: ctx, accountID, tx, txID, txLastValidBlockHeight, txCfgs +func (_m *TxManager) Enqueue(ctx context.Context, accountID string, tx *solana.Transaction, txID *string, txLastValidBlockHeight uint64, txCfgs ...utils.SetTxConfig) error { _va := make([]interface{}, len(txCfgs)) for _i := range txCfgs { _va[_i] = txCfgs[_i] } var _ca []interface{} - _ca = append(_ca, ctx, accountID, tx, txID) + _ca = append(_ca, ctx, accountID, tx, txID, txLastValidBlockHeight) _ca = append(_ca, _va...) ret := _m.Called(_ca...) @@ -87,8 +87,8 @@ func (_m *TxManager) Enqueue(ctx context.Context, accountID string, tx *solana.T } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, *solana.Transaction, *string, ...utils.SetTxConfig) error); ok { - r0 = rf(ctx, accountID, tx, txID, txCfgs...) + if rf, ok := ret.Get(0).(func(context.Context, string, *solana.Transaction, *string, uint64, ...utils.SetTxConfig) error); ok { + r0 = rf(ctx, accountID, tx, txID, txLastValidBlockHeight, txCfgs...) } else { r0 = ret.Error(0) } @@ -106,21 +106,22 @@ type TxManager_Enqueue_Call struct { // - accountID string // - tx *solana.Transaction // - txID *string +// - txLastValidBlockHeight uint64 // - txCfgs ...utils.SetTxConfig -func (_e *TxManager_Expecter) Enqueue(ctx interface{}, accountID interface{}, tx interface{}, txID interface{}, txCfgs ...interface{}) *TxManager_Enqueue_Call { +func (_e *TxManager_Expecter) Enqueue(ctx interface{}, accountID interface{}, tx interface{}, txID interface{}, txLastValidBlockHeight interface{}, txCfgs ...interface{}) *TxManager_Enqueue_Call { return &TxManager_Enqueue_Call{Call: _e.mock.On("Enqueue", - append([]interface{}{ctx, accountID, tx, txID}, txCfgs...)...)} + append([]interface{}{ctx, accountID, tx, txID, txLastValidBlockHeight}, txCfgs...)...)} } -func (_c *TxManager_Enqueue_Call) Run(run func(ctx context.Context, accountID string, tx *solana.Transaction, txID *string, txCfgs ...utils.SetTxConfig)) *TxManager_Enqueue_Call { +func (_c *TxManager_Enqueue_Call) Run(run func(ctx context.Context, accountID string, tx *solana.Transaction, txID *string, txLastValidBlockHeight uint64, txCfgs ...utils.SetTxConfig)) *TxManager_Enqueue_Call { _c.Call.Run(func(args mock.Arguments) { - variadicArgs := make([]utils.SetTxConfig, len(args)-4) - for i, a := range args[4:] { + variadicArgs := make([]utils.SetTxConfig, len(args)-5) + for i, a := range args[5:] { if a != nil { variadicArgs[i] = a.(utils.SetTxConfig) } } - run(args[0].(context.Context), args[1].(string), args[2].(*solana.Transaction), args[3].(*string), variadicArgs...) + run(args[0].(context.Context), args[1].(string), args[2].(*solana.Transaction), args[3].(*string), args[4].(uint64), variadicArgs...) }) return _c } @@ -130,7 +131,7 @@ func (_c *TxManager_Enqueue_Call) Return(_a0 error) *TxManager_Enqueue_Call { return _c } -func (_c *TxManager_Enqueue_Call) RunAndReturn(run func(context.Context, string, *solana.Transaction, *string, ...utils.SetTxConfig) error) *TxManager_Enqueue_Call { +func (_c *TxManager_Enqueue_Call) RunAndReturn(run func(context.Context, string, *solana.Transaction, *string, uint64, ...utils.SetTxConfig) error) *TxManager_Enqueue_Call { _c.Call.Return(run) return _c } diff --git a/pkg/solana/txm/pendingtx.go b/pkg/solana/txm/pendingtx.go index 3b60f0248..7784c47cd 100644 --- a/pkg/solana/txm/pendingtx.go +++ b/pkg/solana/txm/pendingtx.go @@ -11,6 +11,7 @@ import ( "golang.org/x/exp/maps" "github.com/smartcontractkit/chainlink-solana/pkg/solana/txm/utils" + txmutils "github.com/smartcontractkit/chainlink-solana/pkg/solana/txm/utils" ) var ( @@ -54,11 +55,11 @@ type PendingTxContext interface { // finishedTx is used to store info required to track transactions to finality or error type pendingTx struct { tx solana.Transaction - cfg TxConfig + cfg txmutils.TxConfig signatures []solana.Signature id string createTs time.Time - state TxState + state txmutils.TxState lastValidBlockHeight uint64 // to track expiration, equivalent to last valid block number. } @@ -234,7 +235,7 @@ func (c *pendingTxContext) ListAllExpiredBroadcastedTxs(currBlockNumber uint64) defer c.lock.RUnlock() expiredBroadcastedTxs := make([]pendingTx, 0, len(c.broadcastedProcessedTxs)) // worst case, all of them for _, tx := range c.broadcastedProcessedTxs { - if tx.state == Broadcasted && tx.lastValidBlockHeight < currBlockNumber { + if tx.state == txmutils.Broadcasted && tx.lastValidBlockHeight < currBlockNumber { expiredBroadcastedTxs = append(expiredBroadcastedTxs, tx) } } diff --git a/pkg/solana/txm/pendingtx_test.go b/pkg/solana/txm/pendingtx_test.go index ea8e65162..b082b2162 100644 --- a/pkg/solana/txm/pendingtx_test.go +++ b/pkg/solana/txm/pendingtx_test.go @@ -94,7 +94,7 @@ func TestPendingTxContext_new(t *testing.T) { require.Equal(t, sig, tx.signatures[0], "signature should match") // Check status is Broadcasted - require.Equal(t, Broadcasted, tx.state, "transaction state should be Broadcasted") + require.Equal(t, utils.Broadcasted, tx.state, "transaction state should be Broadcasted") // Check it does not exist in confirmed nor finalized maps _, exists = txs.confirmedTxs[msg.id] @@ -1195,12 +1195,12 @@ func TestPendingTxContext_ListAllExpiredBroadcastedTxs(t *testing.T) { setup: func(t *testing.T, ctx *pendingTxContext) { tx1 := pendingTx{ id: "tx1", - state: Broadcasted, + state: utils.Broadcasted, lastValidBlockHeight: 1500, } tx2 := pendingTx{ id: "tx2", - state: Broadcasted, + state: utils.Broadcasted, lastValidBlockHeight: 1600, } ctx.broadcastedProcessedTxs["tx1"] = tx1 @@ -1214,17 +1214,17 @@ func TestPendingTxContext_ListAllExpiredBroadcastedTxs(t *testing.T) { setup: func(t *testing.T, ctx *pendingTxContext) { tx1 := pendingTx{ id: "tx1", - state: Broadcasted, + state: utils.Broadcasted, lastValidBlockHeight: 1000, } tx2 := pendingTx{ id: "tx2", - state: Broadcasted, + state: utils.Broadcasted, lastValidBlockHeight: 1500, } tx3 := pendingTx{ id: "tx3", - state: Broadcasted, + state: utils.Broadcasted, lastValidBlockHeight: 900, } ctx.broadcastedProcessedTxs["tx1"] = tx1 @@ -1239,12 +1239,12 @@ func TestPendingTxContext_ListAllExpiredBroadcastedTxs(t *testing.T) { setup: func(t *testing.T, ctx *pendingTxContext) { tx1 := pendingTx{ id: "tx1", - state: Broadcasted, + state: utils.Broadcasted, lastValidBlockHeight: 1000, } tx2 := pendingTx{ id: "tx2", - state: Broadcasted, + state: utils.Broadcasted, lastValidBlockHeight: 1500, } ctx.broadcastedProcessedTxs["tx1"] = tx1 @@ -1258,17 +1258,17 @@ func TestPendingTxContext_ListAllExpiredBroadcastedTxs(t *testing.T) { setup: func(t *testing.T, ctx *pendingTxContext) { tx1 := pendingTx{ id: "tx1", - state: Broadcasted, + state: utils.Broadcasted, lastValidBlockHeight: 800, } tx2 := pendingTx{ id: "tx2", - state: Processed, // Not Broadcasted + state: utils.Processed, // Not Broadcasted lastValidBlockHeight: 700, } tx3 := pendingTx{ id: "tx3", - state: Processed, // Not Broadcasted + state: utils.Processed, // Not Broadcasted lastValidBlockHeight: 600, } ctx.broadcastedProcessedTxs["tx1"] = tx1 @@ -1283,17 +1283,17 @@ func TestPendingTxContext_ListAllExpiredBroadcastedTxs(t *testing.T) { setup: func(t *testing.T, ctx *pendingTxContext) { tx1 := pendingTx{ id: "tx1", - state: Broadcasted, + state: utils.Broadcasted, lastValidBlockHeight: 1000, } tx2 := pendingTx{ id: "tx2", - state: Broadcasted, + state: utils.Broadcasted, lastValidBlockHeight: 999, } tx3 := pendingTx{ id: "tx3", - state: Broadcasted, + state: utils.Broadcasted, lastValidBlockHeight: 1, } ctx.broadcastedProcessedTxs["tx1"] = tx1 diff --git a/pkg/solana/txm/txm.go b/pkg/solana/txm/txm.go index 8c69e1c30..c87089060 100644 --- a/pkg/solana/txm/txm.go +++ b/pkg/solana/txm/txm.go @@ -46,7 +46,7 @@ var _ loop.Keystore = (SimpleKeystore)(nil) type TxManager interface { services.Service - Enqueue(ctx context.Context, accountID string, tx *solanaGo.Transaction, txID *string, txCfgs ...txmutils.SetTxConfig) error + Enqueue(ctx context.Context, accountID string, tx *solanaGo.Transaction, txID *string, txLastValidBlockHeight uint64, txCfgs ...txmutils.SetTxConfig) error GetTransactionStatus(ctx context.Context, transactionID string) (commontypes.TransactionStatus, error) } @@ -189,7 +189,7 @@ func (txm *Txm) sendWithRetry(ctx context.Context, msg pendingTx) (solanaGo.Tran if initSendErr != nil { // Do not retry and exit early if fails cancel() - stateTransitionErr := txm.txs.OnPrebroadcastError(msg.id, txm.cfg.TxRetentionTimeout(), Errored, TxFailReject) + stateTransitionErr := txm.txs.OnPrebroadcastError(msg.id, txm.cfg.TxRetentionTimeout(), txmutils.Errored, TxFailReject) return solanaGo.Transaction{}, "", solanaGo.Signature{}, fmt.Errorf("tx failed initial transmit: %w", errors.Join(initSendErr, stateTransitionErr)) } @@ -202,7 +202,7 @@ func (txm *Txm) sendWithRetry(ctx context.Context, msg pendingTx) (solanaGo.Tran txm.lggr.Debugw("tx initial broadcast", "id", msg.id, "fee", msg.cfg.BaseComputeUnitPrice, "signature", sig, "lastValidBlockHeight", msg.lastValidBlockHeight) // Initialize signature list with initialTx signature. This list will be used to add new signatures and track retry attempts. - sigs := &signatureList{} + sigs := &txmutils.SignatureList{} sigs.Allocate() if initSetErr := sigs.Set(0, sig); initSetErr != nil { return solanaGo.Transaction{}, "", solanaGo.Signature{}, fmt.Errorf("failed to save initial signature in signature list: %w", initSetErr) @@ -263,7 +263,7 @@ func (txm *Txm) buildTx(ctx context.Context, msg pendingTx, retryCount int) (sol // retryTx contains the logic for retrying the transaction, including exponential backoff and fee bumping. // Retries until context cancelled by timeout or called externally. // It uses handleRetry helper function to handle each retry attempt. -func (txm *Txm) retryTx(ctx context.Context, msg pendingTx, currentTx solanaGo.Transaction, sigs *signatureList) { +func (txm *Txm) retryTx(ctx context.Context, msg pendingTx, currentTx solanaGo.Transaction, sigs *txmutils.SignatureList) { deltaT := 1 // initial delay in ms tick := time.After(0) bumpCount := 0 @@ -300,7 +300,7 @@ func (txm *Txm) retryTx(ctx context.Context, msg pendingTx, currentTx solanaGo.T } // Start a goroutine to handle the retry attempt - // takes currentTx and rebroadcast. If needs bumping it will new signature to already allocated space in signatureList. + // takes currentTx and rebroadcast. If needs bumping it will new signature to already allocated space in txmutils.SignatureList. wg.Add(1) go func(bump bool, count int, retryTx solanaGo.Transaction) { defer wg.Done() @@ -318,7 +318,7 @@ func (txm *Txm) retryTx(ctx context.Context, msg pendingTx, currentTx solanaGo.T } // handleRetry handles the logic for each retry attempt, including sending the transaction, updating signatures, and logging. -func (txm *Txm) handleRetry(ctx context.Context, msg pendingTx, bump bool, count int, retryTx solanaGo.Transaction, sigs *signatureList) { +func (txm *Txm) handleRetry(ctx context.Context, msg pendingTx, bump bool, count int, retryTx solanaGo.Transaction, sigs *txmutils.SignatureList) { // send retry transaction retrySig, err := txm.sendTx(ctx, &retryTx) if err != nil { @@ -420,7 +420,7 @@ func (txm *Txm) processConfirmations(ctx context.Context, client client.ReaderWr defer wg.Done() // to process successful first - sortedSigs, sortedRes, err := SortSignaturesAndResults(sigsBatch[i], statuses) + sortedSigs, sortedRes, err := txmutils.SortSignaturesAndResults(sigsBatch[i], statuses) if err != nil { txm.lggr.Errorw("sorting error", "error", err) return @@ -468,7 +468,7 @@ func (txm *Txm) processConfirmations(ctx context.Context, client client.ReaderWr func (txm *Txm) handleNotFoundSignatureStatus(sig solanaGo.Signature) { txm.lggr.Debugw("tx state: not found", "signature", sig) if txm.cfg.TxConfirmTimeout() != 0*time.Second && txm.txs.Expired(sig, txm.cfg.TxConfirmTimeout()) { - id, err := txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), Errored, TxFailDrop) + id, err := txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), txmutils.Errored, TxFailDrop) if err != nil { txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", sig, "timeoutSeconds", txm.cfg.TxConfirmTimeout(), "error", err) } else { @@ -512,7 +512,7 @@ func (txm *Txm) handleProcessedSignatureStatus(sig solanaGo.Signature) { } // check confirm timeout exceeded if TxConfirmTimeout set if txm.cfg.TxConfirmTimeout() != 0*time.Second && txm.txs.Expired(sig, txm.cfg.TxConfirmTimeout()) { - id, err := txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), Errored, TxFailDrop) + id, err := txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), txmutils.Errored, TxFailDrop) if err != nil { txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", sig, "timeoutSeconds", txm.cfg.TxConfirmTimeout(), "error", err) } else { @@ -591,7 +591,7 @@ func (txm *Txm) rebroadcastExpiredTxs(ctx context.Context, client client.ReaderW // call sendWithRetry directly to avoid enqueuing _, _, _, sendErr := txm.sendWithRetry(ctx, rebroadcastTx) if sendErr != nil { - stateTransitionErr := txm.txs.OnPrebroadcastError(tx.id, txm.cfg.TxRetentionTimeout(), Errored, TxFailReject) + stateTransitionErr := txm.txs.OnPrebroadcastError(tx.id, txm.cfg.TxRetentionTimeout(), txmutils.Errored, TxFailReject) txm.lggr.Errorw("failed to rebroadcast transaction", "id", tx.id, "error", errors.Join(sendErr, stateTransitionErr)) continue } @@ -667,7 +667,7 @@ func (txm *Txm) reap() { } // Enqueue enqueues a msg destined for the solana chain. -func (txm *Txm) Enqueue(ctx context.Context, accountID string, tx *solanaGo.Transaction, txID *string, txLastValidBlockHeight uint64, txCfgs ...SetTxConfig) error { +func (txm *Txm) Enqueue(ctx context.Context, accountID string, tx *solanaGo.Transaction, txID *string, txLastValidBlockHeight uint64, txCfgs ...txmutils.SetTxConfig) error { if err := txm.Ready(); err != nil { return fmt.Errorf("error in soltxm.Enqueue: %w", err) } @@ -834,7 +834,7 @@ func (txm *Txm) simulateTx(ctx context.Context, tx *solanaGo.Transaction) (res * } // ProcessError parses and handles relevant errors found in simulation results -func (txm *Txm) ProcessError(sig solanaGo.Signature, resErr interface{}, simulation bool) (txState TxState, errType TxErrType) { +func (txm *Txm) ProcessError(sig solanaGo.Signature, resErr interface{}, simulation bool) (txState txmutils.TxState, errType TxErrType) { if resErr != nil { // handle various errors // https://github.com/solana-labs/solana/blob/master/sdk/src/transaction/error.rs diff --git a/pkg/solana/txm/txm_internal_test.go b/pkg/solana/txm/txm_internal_test.go index 740c2b500..15e4631a3 100644 --- a/pkg/solana/txm/txm_internal_test.go +++ b/pkg/solana/txm/txm_internal_test.go @@ -688,7 +688,7 @@ func TestTxm(t *testing.T) { // send tx - with disabled fee bumping testTxID := uuid.New().String() lastValidBlockHeight := uint64(100) - assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID, lastValidBlockHeight, SetFeeBumpPeriod(0))) + assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID, lastValidBlockHeight, txmutils.SetFeeBumpPeriod(0))) wg.Wait() // no transactions stored inflight txs list @@ -741,7 +741,7 @@ func TestTxm(t *testing.T) { // send tx - with disabled fee bumping and disabled compute unit limit testTxID := uuid.New().String() lastValidBlockHeight := uint64(100) - assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID, lastValidBlockHeight, SetFeeBumpPeriod(0), SetComputeUnitLimit(0))) + assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID, lastValidBlockHeight, txmutils.SetFeeBumpPeriod(0), txmutils.SetComputeUnitLimit(0))) wg.Wait() // no transactions stored inflight txs list