Skip to content

Commit

Permalink
Updated Solana TXM in-memory storage layer to track transactions stat…
Browse files Browse the repository at this point in the history
…es (#909)

* Updated the in-memory storage to use state maps to better track transactions across their lifecycle

* Removed all tx map from in-memory storage

* Moved retention timeout logic into OnFinalized and OnError methods

* Updated internal tests and fixed linting

* Added check for same state transition calls on transactions

* Updated logs and fixed chain test

* Added new internal TXM tests and moved tx ID generation to Enqueue

* Updated broadcast log and fixed confirm timeout logic

* Fixed linting

* Updated internal tests to validate reap mechanism

* Updated comment

* Updated error messages

* Fixed tests

* Fixed internal tests and linting

* Reverted predefined error and updated error logs

* Fixed chain test

* Updated keystore Accounts mock

* Added errors to state change methods and updated logs

* Encapsulated in-memory storage locking in separate methods

* Fixed tests and linting

* Added tests for add signature and get tx state

* Fixed linting
  • Loading branch information
amit-momin authored Nov 12, 2024
1 parent efd6780 commit 65ae137
Show file tree
Hide file tree
Showing 17 changed files with 2,170 additions and 449 deletions.
2 changes: 1 addition & 1 deletion pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func (c *chain) sendTx(ctx context.Context, from, to string, amount *big.Int, ba
}

chainTxm := c.TxManager()
err = chainTxm.Enqueue(ctx, "", tx,
err = chainTxm.Enqueue(ctx, "", tx, nil,
txm.SetComputeUnitLimit(500), // reduce from default 200K limit - should only take 450 compute units
// no fee bumping and no additional fee - makes validating balance accurate
txm.SetComputeUnitPriceMax(0),
Expand Down
11 changes: 6 additions & 5 deletions pkg/solana/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,11 @@ func TestChain_Transact(t *testing.T) {
require.NoError(t, c.txm.Start(ctx))

require.NoError(t, c.Transact(ctx, sender.PublicKey().String(), receiver.PublicKey().String(), amount, true))
tests.AssertLogEventually(t, logs, "tx state: confirmed")
tests.AssertLogEventually(t, logs, "marking transaction as confirmed")
tests.AssertLogEventually(t, logs, "stopped tx retry")
require.NoError(t, c.txm.Close())

filteredLogs := logs.FilterMessage("tx state: confirmed").All()
filteredLogs := logs.FilterMessage("marking transaction as confirmed").All()
require.Len(t, filteredLogs, 1)
sig, ok := filteredLogs[0].ContextMap()["signature"]
require.True(t, ok)
Expand Down Expand Up @@ -515,6 +515,7 @@ func TestSolanaChain_MultiNode_Txm(t *testing.T) {
return sig[:]
}, nil)
mkey.On("Sign", mock.Anything, pubKeyReceiver.String(), mock.Anything).Return([]byte{}, config.KeyNotFoundError{ID: pubKeyReceiver.String(), KeyType: "Solana"})
mkey.On("Accounts", mock.Anything).Return([]string{pubKey.String()}, nil).Maybe()

testChain, err := newChain("localnet", cfg, mkey, logger.Test(t))
require.NoError(t, err)
Expand Down Expand Up @@ -556,7 +557,7 @@ func TestSolanaChain_MultiNode_Txm(t *testing.T) {
}

// Send funds twice, along with an invalid transaction
require.NoError(t, testChain.txm.Enqueue(tests.Context(t), "test_success", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL)))
require.NoError(t, testChain.txm.Enqueue(tests.Context(t), "test_success", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil))

// Wait for new block hash
currentBh, err := selectedClient.LatestBlockhash(tests.Context(t))
Expand All @@ -577,8 +578,8 @@ NewBlockHash:
}
}

require.NoError(t, testChain.txm.Enqueue(tests.Context(t), "test_success_2", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL)))
require.Error(t, testChain.txm.Enqueue(tests.Context(t), "test_invalidSigner", createTx(pubKeyReceiver, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL))) // cannot sign tx before enqueuing
require.NoError(t, testChain.txm.Enqueue(tests.Context(t), "test_success_2", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil))
require.Error(t, testChain.txm.Enqueue(tests.Context(t), "test_invalidSigner", createTx(pubKeyReceiver, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil)) // cannot sign tx before enqueuing

// wait for all txes to finish
ctx, cancel := context.WithCancel(tests.Context(t))
Expand Down
8 changes: 7 additions & 1 deletion pkg/solana/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ var defaultConfigSet = Chain{
OCR2CacheTTL: config.MustNewDuration(time.Minute), // stale cache deadline
TxTimeout: config.MustNewDuration(time.Minute), // timeout for send tx method in client
TxRetryTimeout: config.MustNewDuration(10 * time.Second), // duration for tx rebroadcasting to RPC node
TxConfirmTimeout: config.MustNewDuration(30 * time.Second), // duration before discarding tx as unconfirmed
TxConfirmTimeout: config.MustNewDuration(30 * time.Second), // duration before discarding tx as unconfirmed. Set to 0 to disable discarding tx.
TxRetentionTimeout: config.MustNewDuration(0 * time.Second), // duration to retain transactions after being marked as finalized or errored. Set to 0 to immediately drop transactions.
SkipPreflight: ptr(true), // to enable or disable preflight checks
Commitment: ptr(string(rpc.CommitmentConfirmed)),
MaxRetries: ptr(int64(0)), // max number of retries (default = 0). when config.MaxRetries < 0), interpreted as MaxRetries = nil and rpc node will do a reasonable number of retries
Expand All @@ -43,6 +44,7 @@ type Config interface {
TxTimeout() time.Duration
TxRetryTimeout() time.Duration
TxConfirmTimeout() time.Duration
TxRetentionTimeout() time.Duration
SkipPreflight() bool
Commitment() rpc.CommitmentType
MaxRetries() *uint
Expand All @@ -67,6 +69,7 @@ type Chain struct {
TxTimeout *config.Duration
TxRetryTimeout *config.Duration
TxConfirmTimeout *config.Duration
TxRetentionTimeout *config.Duration
SkipPreflight *bool
Commitment *string
MaxRetries *int64
Expand Down Expand Up @@ -103,6 +106,9 @@ func (c *Chain) SetDefaults() {
if c.TxConfirmTimeout == nil {
c.TxConfirmTimeout = defaultConfigSet.TxConfirmTimeout
}
if c.TxRetentionTimeout == nil {
c.TxRetentionTimeout = defaultConfigSet.TxRetentionTimeout
}
if c.SkipPreflight == nil {
c.SkipPreflight = defaultConfigSet.SkipPreflight
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/solana/config/mocks/config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/solana/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ func setFromChain(c, f *Chain) {
if f.TxConfirmTimeout != nil {
c.TxConfirmTimeout = f.TxConfirmTimeout
}
if f.TxRetentionTimeout != nil {
c.TxRetentionTimeout = f.TxRetentionTimeout
}
if f.SkipPreflight != nil {
c.SkipPreflight = f.SkipPreflight
}
Expand Down Expand Up @@ -238,6 +241,9 @@ func (c *TOMLConfig) TxConfirmTimeout() time.Duration {
return c.Chain.TxConfirmTimeout.Duration()
}

func (c *TOMLConfig) TxRetentionTimeout() time.Duration {
return c.Chain.TxRetentionTimeout.Duration()
}
func (c *TOMLConfig) SkipPreflight() bool {
return *c.Chain.SkipPreflight
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/solana/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
var _ TxManager = (*txm.Txm)(nil)

type TxManager interface {
Enqueue(ctx context.Context, accountID string, msg *solana.Transaction, txCfgs ...txm.SetTxConfig) error
Enqueue(ctx context.Context, accountID string, tx *solana.Transaction, txID *string, txCfgs ...txm.SetTxConfig) error
}

var _ relaytypes.Relayer = &Relayer{} //nolint:staticcheck
Expand Down
2 changes: 1 addition & 1 deletion pkg/solana/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *Transmitter) Transmit(

// pass transmit payload to tx manager queue
c.lggr.Debugf("Queuing transmit tx: state (%s) + transmissions (%s)", c.stateID.String(), c.transmissionsID.String())
if err = c.txManager.Enqueue(ctx, c.stateID.String(), tx); err != nil {
if err = c.txManager.Enqueue(ctx, c.stateID.String(), tx, nil); err != nil {
return fmt.Errorf("error on Transmit.txManager.Enqueue: %w", err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/solana/transmitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type verifyTxSize struct {
s *solana.PrivateKey
}

func (txm verifyTxSize) Enqueue(_ context.Context, _ string, tx *solana.Transaction, _ ...txm.SetTxConfig) error {
func (txm verifyTxSize) Enqueue(_ context.Context, _ string, tx *solana.Transaction, txID *string, _ ...txm.SetTxConfig) error {
// additional components that transaction manager adds to the transaction
require.NoError(txm.t, fees.SetComputeUnitPrice(tx, 0))
require.NoError(txm.t, fees.SetComputeUnitLimit(tx, 0))
Expand Down
Loading

0 comments on commit 65ae137

Please sign in to comment.