Skip to content

Commit

Permalink
solved conflicts between TXM changes and unit test changes
Browse files Browse the repository at this point in the history
  • Loading branch information
silaslenihan committed Dec 19, 2024
1 parent 4560f68 commit bc4d56c
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 48 deletions.
2 changes: 1 addition & 1 deletion pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (c *chain) sendTx(ctx context.Context, from, to string, amount *big.Int, ba

chainTxm := c.TxManager()
err = chainTxm.Enqueue(ctx, "", tx, nil, blockhash.Value.LastValidBlockHeight,
txm.SetComputeUnitLimit(500), // reduce from default 200K limit - should only take 450 compute units
txmutils.SetComputeUnitLimit(500), // reduce from default 200K limit - should only take 450 compute units
// no fee bumping and no additional fee - makes validating balance accurate
txmutils.SetComputeUnitPriceMax(0),
txmutils.SetComputeUnitPriceMin(0),
Expand Down
2 changes: 1 addition & 1 deletion pkg/solana/chainwriter/chain_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (s *SolanaChainWriterService) SubmitTransaction(ctx context.Context, contra
}

// Enqueue transaction
if err = s.txm.Enqueue(ctx, accounts[0].PublicKey.String(), tx, &transactionID); err != nil {
if err = s.txm.Enqueue(ctx, accounts[0].PublicKey.String(), tx, &transactionID, blockhash.Value.LastValidBlockHeight); err != nil {
return errorWithDebugID(fmt.Errorf("error enqueuing transaction: %w", err), debugID)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/solana/chainwriter/chain_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func TestChainWriter_SubmitTransaction(t *testing.T) {
require.Len(t, tx.Message.AddressTableLookups, 1) // address table look contains entry
require.Equal(t, derivedLookupTablePubkey, tx.Message.AddressTableLookups[0].AccountKey) // address table
return true
}), &txID).Return(nil).Once()
}), &txID, mock.Anything).Return(nil).Once()

args := map[string]interface{}{
"lookupTable": chainwriter.GetRandomPubKey(t).Bytes(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/solana/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type TxManager interface {
// - txCfgs can be used to set custom tx configurations.
// - If a txID is provided, it will be used to identify the tx. Otherwise, a random UUID will be generated.
// - The caller needs to set the tx.Message.RecentBlockhash and provide the corresponding lastValidBlockHeight. These values are obtained from the GetLatestBlockhash RPC call.
Enqueue(ctx context.Context, accountID string, tx *solana.Transaction, txID *string, lastValidBlockHeight uint64, txCfgs ...txm.SetTxConfig) error
Enqueue(ctx context.Context, accountID string, tx *solana.Transaction, txID *string, lastValidBlockHeight uint64, txCfgs ...txmutils.SetTxConfig) error
}

var _ relaytypes.Relayer = &Relayer{} //nolint:staticcheck
Expand Down
2 changes: 1 addition & 1 deletion pkg/solana/transmitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type verifyTxSize struct {
s *solana.PrivateKey
}

func (txm verifyTxSize) Enqueue(_ context.Context, _ string, tx *solana.Transaction, txID *string, _ uint64, _ ...txm.SetTxConfig) error {
func (txm verifyTxSize) Enqueue(_ context.Context, _ string, tx *solana.Transaction, txID *string, _ uint64, _ ...txmutils.SetTxConfig) error {
// additional components that transaction manager adds to the transaction
require.NoError(txm.t, fees.SetComputeUnitPrice(tx, 0))
require.NoError(txm.t, fees.SetComputeUnitLimit(tx, 0))
Expand Down
25 changes: 13 additions & 12 deletions pkg/solana/txm/mocks/tx_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions pkg/solana/txm/pendingtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"golang.org/x/exp/maps"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/txm/utils"
txmutils "github.com/smartcontractkit/chainlink-solana/pkg/solana/txm/utils"
)

var (
Expand Down Expand Up @@ -54,11 +55,11 @@ type PendingTxContext interface {
// finishedTx is used to store info required to track transactions to finality or error
type pendingTx struct {
tx solana.Transaction
cfg TxConfig
cfg txmutils.TxConfig
signatures []solana.Signature
id string
createTs time.Time
state TxState
state txmutils.TxState
lastValidBlockHeight uint64 // to track expiration, equivalent to last valid block number.
}

Expand Down Expand Up @@ -234,7 +235,7 @@ func (c *pendingTxContext) ListAllExpiredBroadcastedTxs(currBlockNumber uint64)
defer c.lock.RUnlock()
expiredBroadcastedTxs := make([]pendingTx, 0, len(c.broadcastedProcessedTxs)) // worst case, all of them
for _, tx := range c.broadcastedProcessedTxs {
if tx.state == Broadcasted && tx.lastValidBlockHeight < currBlockNumber {
if tx.state == txmutils.Broadcasted && tx.lastValidBlockHeight < currBlockNumber {
expiredBroadcastedTxs = append(expiredBroadcastedTxs, tx)
}
}
Expand Down
28 changes: 14 additions & 14 deletions pkg/solana/txm/pendingtx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestPendingTxContext_new(t *testing.T) {
require.Equal(t, sig, tx.signatures[0], "signature should match")

// Check status is Broadcasted
require.Equal(t, Broadcasted, tx.state, "transaction state should be Broadcasted")
require.Equal(t, utils.Broadcasted, tx.state, "transaction state should be Broadcasted")

// Check it does not exist in confirmed nor finalized maps
_, exists = txs.confirmedTxs[msg.id]
Expand Down Expand Up @@ -1195,12 +1195,12 @@ func TestPendingTxContext_ListAllExpiredBroadcastedTxs(t *testing.T) {
setup: func(t *testing.T, ctx *pendingTxContext) {
tx1 := pendingTx{
id: "tx1",
state: Broadcasted,
state: utils.Broadcasted,
lastValidBlockHeight: 1500,
}
tx2 := pendingTx{
id: "tx2",
state: Broadcasted,
state: utils.Broadcasted,
lastValidBlockHeight: 1600,
}
ctx.broadcastedProcessedTxs["tx1"] = tx1
Expand All @@ -1214,17 +1214,17 @@ func TestPendingTxContext_ListAllExpiredBroadcastedTxs(t *testing.T) {
setup: func(t *testing.T, ctx *pendingTxContext) {
tx1 := pendingTx{
id: "tx1",
state: Broadcasted,
state: utils.Broadcasted,
lastValidBlockHeight: 1000,
}
tx2 := pendingTx{
id: "tx2",
state: Broadcasted,
state: utils.Broadcasted,
lastValidBlockHeight: 1500,
}
tx3 := pendingTx{
id: "tx3",
state: Broadcasted,
state: utils.Broadcasted,
lastValidBlockHeight: 900,
}
ctx.broadcastedProcessedTxs["tx1"] = tx1
Expand All @@ -1239,12 +1239,12 @@ func TestPendingTxContext_ListAllExpiredBroadcastedTxs(t *testing.T) {
setup: func(t *testing.T, ctx *pendingTxContext) {
tx1 := pendingTx{
id: "tx1",
state: Broadcasted,
state: utils.Broadcasted,
lastValidBlockHeight: 1000,
}
tx2 := pendingTx{
id: "tx2",
state: Broadcasted,
state: utils.Broadcasted,
lastValidBlockHeight: 1500,
}
ctx.broadcastedProcessedTxs["tx1"] = tx1
Expand All @@ -1258,17 +1258,17 @@ func TestPendingTxContext_ListAllExpiredBroadcastedTxs(t *testing.T) {
setup: func(t *testing.T, ctx *pendingTxContext) {
tx1 := pendingTx{
id: "tx1",
state: Broadcasted,
state: utils.Broadcasted,
lastValidBlockHeight: 800,
}
tx2 := pendingTx{
id: "tx2",
state: Processed, // Not Broadcasted
state: utils.Processed, // Not Broadcasted
lastValidBlockHeight: 700,
}
tx3 := pendingTx{
id: "tx3",
state: Processed, // Not Broadcasted
state: utils.Processed, // Not Broadcasted
lastValidBlockHeight: 600,
}
ctx.broadcastedProcessedTxs["tx1"] = tx1
Expand All @@ -1283,17 +1283,17 @@ func TestPendingTxContext_ListAllExpiredBroadcastedTxs(t *testing.T) {
setup: func(t *testing.T, ctx *pendingTxContext) {
tx1 := pendingTx{
id: "tx1",
state: Broadcasted,
state: utils.Broadcasted,
lastValidBlockHeight: 1000,
}
tx2 := pendingTx{
id: "tx2",
state: Broadcasted,
state: utils.Broadcasted,
lastValidBlockHeight: 999,
}
tx3 := pendingTx{
id: "tx3",
state: Broadcasted,
state: utils.Broadcasted,
lastValidBlockHeight: 1,
}
ctx.broadcastedProcessedTxs["tx1"] = tx1
Expand Down
24 changes: 12 additions & 12 deletions pkg/solana/txm/txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var _ loop.Keystore = (SimpleKeystore)(nil)

type TxManager interface {
services.Service
Enqueue(ctx context.Context, accountID string, tx *solanaGo.Transaction, txID *string, txCfgs ...txmutils.SetTxConfig) error
Enqueue(ctx context.Context, accountID string, tx *solanaGo.Transaction, txID *string, txLastValidBlockHeight uint64, txCfgs ...txmutils.SetTxConfig) error
GetTransactionStatus(ctx context.Context, transactionID string) (commontypes.TransactionStatus, error)
}

Expand Down Expand Up @@ -189,7 +189,7 @@ func (txm *Txm) sendWithRetry(ctx context.Context, msg pendingTx) (solanaGo.Tran
if initSendErr != nil {
// Do not retry and exit early if fails
cancel()
stateTransitionErr := txm.txs.OnPrebroadcastError(msg.id, txm.cfg.TxRetentionTimeout(), Errored, TxFailReject)
stateTransitionErr := txm.txs.OnPrebroadcastError(msg.id, txm.cfg.TxRetentionTimeout(), txmutils.Errored, TxFailReject)
return solanaGo.Transaction{}, "", solanaGo.Signature{}, fmt.Errorf("tx failed initial transmit: %w", errors.Join(initSendErr, stateTransitionErr))
}

Expand All @@ -202,7 +202,7 @@ func (txm *Txm) sendWithRetry(ctx context.Context, msg pendingTx) (solanaGo.Tran
txm.lggr.Debugw("tx initial broadcast", "id", msg.id, "fee", msg.cfg.BaseComputeUnitPrice, "signature", sig, "lastValidBlockHeight", msg.lastValidBlockHeight)

// Initialize signature list with initialTx signature. This list will be used to add new signatures and track retry attempts.
sigs := &signatureList{}
sigs := &txmutils.SignatureList{}
sigs.Allocate()
if initSetErr := sigs.Set(0, sig); initSetErr != nil {
return solanaGo.Transaction{}, "", solanaGo.Signature{}, fmt.Errorf("failed to save initial signature in signature list: %w", initSetErr)
Expand Down Expand Up @@ -263,7 +263,7 @@ func (txm *Txm) buildTx(ctx context.Context, msg pendingTx, retryCount int) (sol
// retryTx contains the logic for retrying the transaction, including exponential backoff and fee bumping.
// Retries until context cancelled by timeout or called externally.
// It uses handleRetry helper function to handle each retry attempt.
func (txm *Txm) retryTx(ctx context.Context, msg pendingTx, currentTx solanaGo.Transaction, sigs *signatureList) {
func (txm *Txm) retryTx(ctx context.Context, msg pendingTx, currentTx solanaGo.Transaction, sigs *txmutils.SignatureList) {
deltaT := 1 // initial delay in ms
tick := time.After(0)
bumpCount := 0
Expand Down Expand Up @@ -300,7 +300,7 @@ func (txm *Txm) retryTx(ctx context.Context, msg pendingTx, currentTx solanaGo.T
}

// Start a goroutine to handle the retry attempt
// takes currentTx and rebroadcast. If needs bumping it will new signature to already allocated space in signatureList.
// takes currentTx and rebroadcast. If needs bumping it will new signature to already allocated space in txmutils.SignatureList.
wg.Add(1)
go func(bump bool, count int, retryTx solanaGo.Transaction) {
defer wg.Done()
Expand All @@ -318,7 +318,7 @@ func (txm *Txm) retryTx(ctx context.Context, msg pendingTx, currentTx solanaGo.T
}

// handleRetry handles the logic for each retry attempt, including sending the transaction, updating signatures, and logging.
func (txm *Txm) handleRetry(ctx context.Context, msg pendingTx, bump bool, count int, retryTx solanaGo.Transaction, sigs *signatureList) {
func (txm *Txm) handleRetry(ctx context.Context, msg pendingTx, bump bool, count int, retryTx solanaGo.Transaction, sigs *txmutils.SignatureList) {
// send retry transaction
retrySig, err := txm.sendTx(ctx, &retryTx)
if err != nil {
Expand Down Expand Up @@ -420,7 +420,7 @@ func (txm *Txm) processConfirmations(ctx context.Context, client client.ReaderWr
defer wg.Done()

// to process successful first
sortedSigs, sortedRes, err := SortSignaturesAndResults(sigsBatch[i], statuses)
sortedSigs, sortedRes, err := txmutils.SortSignaturesAndResults(sigsBatch[i], statuses)
if err != nil {
txm.lggr.Errorw("sorting error", "error", err)
return
Expand Down Expand Up @@ -468,7 +468,7 @@ func (txm *Txm) processConfirmations(ctx context.Context, client client.ReaderWr
func (txm *Txm) handleNotFoundSignatureStatus(sig solanaGo.Signature) {
txm.lggr.Debugw("tx state: not found", "signature", sig)
if txm.cfg.TxConfirmTimeout() != 0*time.Second && txm.txs.Expired(sig, txm.cfg.TxConfirmTimeout()) {
id, err := txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), Errored, TxFailDrop)
id, err := txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), txmutils.Errored, TxFailDrop)
if err != nil {
txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", sig, "timeoutSeconds", txm.cfg.TxConfirmTimeout(), "error", err)
} else {
Expand Down Expand Up @@ -512,7 +512,7 @@ func (txm *Txm) handleProcessedSignatureStatus(sig solanaGo.Signature) {
}
// check confirm timeout exceeded if TxConfirmTimeout set
if txm.cfg.TxConfirmTimeout() != 0*time.Second && txm.txs.Expired(sig, txm.cfg.TxConfirmTimeout()) {
id, err := txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), Errored, TxFailDrop)
id, err := txm.txs.OnError(sig, txm.cfg.TxRetentionTimeout(), txmutils.Errored, TxFailDrop)
if err != nil {
txm.lggr.Infow("failed to mark transaction as errored", "id", id, "signature", sig, "timeoutSeconds", txm.cfg.TxConfirmTimeout(), "error", err)
} else {
Expand Down Expand Up @@ -591,7 +591,7 @@ func (txm *Txm) rebroadcastExpiredTxs(ctx context.Context, client client.ReaderW
// call sendWithRetry directly to avoid enqueuing
_, _, _, sendErr := txm.sendWithRetry(ctx, rebroadcastTx)
if sendErr != nil {
stateTransitionErr := txm.txs.OnPrebroadcastError(tx.id, txm.cfg.TxRetentionTimeout(), Errored, TxFailReject)
stateTransitionErr := txm.txs.OnPrebroadcastError(tx.id, txm.cfg.TxRetentionTimeout(), txmutils.Errored, TxFailReject)
txm.lggr.Errorw("failed to rebroadcast transaction", "id", tx.id, "error", errors.Join(sendErr, stateTransitionErr))
continue
}
Expand Down Expand Up @@ -667,7 +667,7 @@ func (txm *Txm) reap() {
}

// Enqueue enqueues a msg destined for the solana chain.
func (txm *Txm) Enqueue(ctx context.Context, accountID string, tx *solanaGo.Transaction, txID *string, txLastValidBlockHeight uint64, txCfgs ...SetTxConfig) error {
func (txm *Txm) Enqueue(ctx context.Context, accountID string, tx *solanaGo.Transaction, txID *string, txLastValidBlockHeight uint64, txCfgs ...txmutils.SetTxConfig) error {
if err := txm.Ready(); err != nil {
return fmt.Errorf("error in soltxm.Enqueue: %w", err)
}
Expand Down Expand Up @@ -834,7 +834,7 @@ func (txm *Txm) simulateTx(ctx context.Context, tx *solanaGo.Transaction) (res *
}

// ProcessError parses and handles relevant errors found in simulation results
func (txm *Txm) ProcessError(sig solanaGo.Signature, resErr interface{}, simulation bool) (txState TxState, errType TxErrType) {
func (txm *Txm) ProcessError(sig solanaGo.Signature, resErr interface{}, simulation bool) (txState txmutils.TxState, errType TxErrType) {
if resErr != nil {
// handle various errors
// https://github.com/solana-labs/solana/blob/master/sdk/src/transaction/error.rs
Expand Down
4 changes: 2 additions & 2 deletions pkg/solana/txm/txm_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ func TestTxm(t *testing.T) {
// send tx - with disabled fee bumping
testTxID := uuid.New().String()
lastValidBlockHeight := uint64(100)
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID, lastValidBlockHeight, SetFeeBumpPeriod(0)))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID, lastValidBlockHeight, txmutils.SetFeeBumpPeriod(0)))
wg.Wait()

// no transactions stored inflight txs list
Expand Down Expand Up @@ -741,7 +741,7 @@ func TestTxm(t *testing.T) {
// send tx - with disabled fee bumping and disabled compute unit limit
testTxID := uuid.New().String()
lastValidBlockHeight := uint64(100)
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID, lastValidBlockHeight, SetFeeBumpPeriod(0), SetComputeUnitLimit(0)))
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &testTxID, lastValidBlockHeight, txmutils.SetFeeBumpPeriod(0), txmutils.SetComputeUnitLimit(0)))
wg.Wait()

// no transactions stored inflight txs list
Expand Down

0 comments on commit bc4d56c

Please sign in to comment.