Skip to content

Commit

Permalink
MultiNode Transaction Sender (#879)
Browse files Browse the repository at this point in the history
* MultiNode integration setup

* Update MultiNode files

* Add MultiNode flag

* Remove internal dependency

* Fix build

* Fix import cycle

* tidy

* Update client_test.go

* lint

* Fix duplicate metrics

* Add chain multinode flag

* Extend client

* Implement rpc client methods

* Add defaults

* Add latest block methods

* Address comments

* lint

* Fix lint overflow issues

* Update transaction_sender.go

* Fix lint

* Validate node config

* Update toml.go

* Add SendOnly nodes

* Use pointers on config

* Add test outlines

* Use test context

* Use configured selection mode

* Set defaults

* lint

* Add nil check

* Add client test

* Add subscription test

* tidy

* Fix imports

* Update chain_test.go

* Update multinode.go

* Add comments

* Update multinode.go

* Wrap multinode config

* Fix imports

* Update .golangci.yml

* Use MultiNode

* Add multinode to txm

* Use MultiNode

* Update chain.go

* Update balance_test.go

* Add retries

* Fix head

* Update client.go

* lint

* lint

* Use MultiNode TxSender

* Update txm_internal_test.go

* Address comments

* Remove total difficulty

* Register polling subs

* Extract MultiNodeClient

* Remove caching changes

* Undo cache changes

* Fix tests

* Update chain.go

* Fix variables

* Move classify errors

* Fix imports

* lint

* Update txm_internal_test.go

* Update txm_internal_test.go

* lint

* Fix error classification

* Update txm_internal_test.go

* Update multinode_client.go

* lint

* Update classify_errors.go

* Update classify_errors.go

* Add tests

* Add test coverage

* lint

* Add dial comment

* CTF bump for image build

* Update pkg/solana/client/multinode_client.go

Co-authored-by: Dmytro Haidashenko <[email protected]>

* Update txm.go

* Create loader

* Update transaction_sender.go

* Fix tests

* Update txm_internal_test.go

* lint

* Update txm.go

* Add ctx

* Fix imports

* Add SendTxResult to TxSender

* Update chain_test.go

* Move error classification

* Use loader

* Use loader in txm tests

* lint

* Update loader

* Use single RPC

* Fix tests

* lint

* Address comments

* Update classify_errors.go

* Update errors

* lint

* Fix SendTransaction

* Update chain.go

* Update sendTx

* Fix ctx issues

* Update ctx

* Update transaction_sender.go

* Update transaction_sender.go

* Update transaction_sender.go

* Update ctx

* Add timer

* Update transaction_sender.go

* Update transaction_sender.go

* Fix ctx

* Remove debug logging

* lint

* Fix ctx cancel

* Fix DoAll ctx

* defer reportWg

---------

Co-authored-by: Damjan Smickovski <[email protected]>
Co-authored-by: Dmytro Haidashenko <[email protected]>
  • Loading branch information
3 people authored Nov 5, 2024
1 parent 39cabce commit b49b2be
Show file tree
Hide file tree
Showing 17 changed files with 702 additions and 212 deletions.
44 changes: 28 additions & 16 deletions pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"

mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/internal"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/txm"
)
Expand Down Expand Up @@ -90,7 +91,7 @@ type chain struct {

// if multiNode is enabled, the clientCache will not be used
multiNode *mn.MultiNode[mn.StringID, *client.MultiNodeClient]
txSender *mn.TransactionSender[*solanago.Transaction, mn.StringID, *client.MultiNodeClient]
txSender *mn.TransactionSender[*solanago.Transaction, *client.SendTxResult, mn.StringID, *client.MultiNodeClient]

// tracking node chain id for verification
clientCache map[string]*verifiedCachedClient // map URL -> {client, chainId} [mainnet/testnet/devnet/localnet]
Expand Down Expand Up @@ -230,6 +231,12 @@ func newChain(id string, cfg *config.TOMLConfig, ks loop.Keystore, lggr logger.L
clientCache: map[string]*verifiedCachedClient{},
}

var tc internal.Loader[client.ReaderWriter] = utils.NewLazyLoad(func() (client.ReaderWriter, error) { return ch.getClient() })
var bc internal.Loader[monitor.BalanceClient] = utils.NewLazyLoad(func() (monitor.BalanceClient, error) { return ch.getClient() })

// txm will default to sending transactions using a single RPC client if sendTx is nil
var sendTx func(ctx context.Context, tx *solanago.Transaction) (solanago.Signature, error)

if cfg.MultiNode.Enabled() {
chainFamily := "solana"

Expand Down Expand Up @@ -268,18 +275,12 @@ func newChain(id string, cfg *config.TOMLConfig, ks loop.Keystore, lggr logger.L
mnCfg.DeathDeclarationDelay(),
)

// TODO: implement error classification; move logic to separate file if large
// TODO: might be useful to reference anza-xyz/agave@master/sdk/src/transaction/error.rs
classifySendError := func(tx *solanago.Transaction, err error) mn.SendTxReturnCode {
return 0 // TODO ClassifySendError(err, clientErrors, logger.Sugared(logger.Nop()), tx, common.Address{}, false)
}

txSender := mn.NewTransactionSender[*solanago.Transaction, mn.StringID, *client.MultiNodeClient](
txSender := mn.NewTransactionSender[*solanago.Transaction, *client.SendTxResult, mn.StringID, *client.MultiNodeClient](
lggr,
mn.StringID(id),
chainFamily,
multiNode,
classifySendError,
client.NewSendTxResult,
0, // use the default value provided by the implementation
)

Expand All @@ -288,13 +289,24 @@ func newChain(id string, cfg *config.TOMLConfig, ks loop.Keystore, lggr logger.L

// clientCache will not be used if multinode is enabled
ch.clientCache = nil
}

tc := func() (client.ReaderWriter, error) {
return ch.getClient()
// Send tx using MultiNode transaction sender
sendTx = func(ctx context.Context, tx *solanago.Transaction) (solanago.Signature, error) {
result := ch.txSender.SendTransaction(ctx, tx)
if result == nil {
return solanago.Signature{}, errors.New("tx sender returned nil result")
}
if result.Error() != nil {
return solanago.Signature{}, result.Error()
}
return result.Signature(), result.TxError()
}

tc = internal.NewLoader[client.ReaderWriter](func() (client.ReaderWriter, error) { return ch.multiNode.SelectRPC() })
bc = internal.NewLoader[monitor.BalanceClient](func() (monitor.BalanceClient, error) { return ch.multiNode.SelectRPC() })
}
ch.txm = txm.NewTxm(ch.id, tc, cfg, ks, lggr)
bc := func() (monitor.BalanceClient, error) { return ch.getClient() }

ch.txm = txm.NewTxm(ch.id, tc, sendTx, cfg, ks, lggr)
ch.balanceMonitor = monitor.NewBalanceMonitor(ch.id, cfg, lggr, ks, bc)
return &ch, nil
}
Expand Down
Loading

0 comments on commit b49b2be

Please sign in to comment.