diff --git a/integration-tests/testconfig/testconfig.go b/integration-tests/testconfig/testconfig.go index 1f482b7f5..3582e72e6 100644 --- a/integration-tests/testconfig/testconfig.go +++ b/integration-tests/testconfig/testconfig.go @@ -3,14 +3,13 @@ package testconfig import ( "embed" "encoding/base64" + "errors" "fmt" "log" "os" "strings" "time" - "errors" - "github.com/barkimedes/go-deepcopy" "github.com/google/uuid" "github.com/pelletier/go-toml/v2" @@ -265,13 +264,9 @@ func (c *TestConfig) GetNodeConfigTOML() (string, error) { url = c.GetURL() } - mnConfig := solcfg.MultiNodeConfig{ - MultiNode: solcfg.MultiNode{ - Enabled: ptr.Ptr(true), - SyncThreshold: ptr.Ptr(uint32(170)), - }, - } - mnConfig.SetDefaults() + mnConfig := solcfg.NewDefaultMultiNodeConfig() + mnConfig.MultiNode.Enabled = ptr.Ptr(true) + mnConfig.MultiNode.SyncThreshold = ptr.Ptr(uint32(170)) var nodes []*solcfg.Node for i, u := range url { diff --git a/pkg/solana/chain_test.go b/pkg/solana/chain_test.go index c2182e7c1..3a9550c48 100644 --- a/pkg/solana/chain_test.go +++ b/pkg/solana/chain_test.go @@ -335,12 +335,8 @@ func TestSolanaChain_MultiNode_GetClient(t *testing.T) { ch := solcfg.Chain{} ch.SetDefaults() - mnCfg := solcfg.MultiNodeConfig{ - MultiNode: solcfg.MultiNode{ - Enabled: ptr(true), - }, - } - mnCfg.SetDefaults() + mnCfg := solcfg.NewDefaultMultiNodeConfig() + mnCfg.MultiNode.Enabled = ptr(true) cfg := &solcfg.TOMLConfig{ ChainID: ptr("devnet"), diff --git a/pkg/solana/client/multinode/adaptor.go b/pkg/solana/client/multinode/adaptor.go new file mode 100644 index 000000000..c01bb7880 --- /dev/null +++ b/pkg/solana/client/multinode/adaptor.go @@ -0,0 +1,274 @@ +package client + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + + mnCfg "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode/config" +) + +// MultiNodeAdapter is used to integrate multinode into chain-specific clients +type MultiNodeAdapter[RPC any, HEAD Head] struct { + cfg *mnCfg.MultiNodeConfig + log logger.Logger + rpc *RPC + ctxTimeout time.Duration + stateMu sync.RWMutex // protects state* fields + subsSliceMu sync.RWMutex + subs map[Subscription]struct{} + + latestBlock func(ctx context.Context, rpc *RPC) (HEAD, error) + latestFinalizedBlock func(ctx context.Context, rpc *RPC) (HEAD, error) + + // chStopInFlight can be closed to immediately cancel all in-flight requests on + // this RpcMultiNodeAdapter. Closing and replacing should be serialized through + // stateMu since it can happen on state transitions as well as RpcMultiNodeAdapter Close. + chStopInFlight chan struct{} + + chainInfoLock sync.RWMutex + // intercepted values seen by callers of the rpcMultiNodeAdapter excluding health check calls. Need to ensure MultiNode provides repeatable read guarantee + highestUserObservations ChainInfo + // most recent chain info observed during current lifecycle (reseted on DisconnectAll) + latestChainInfo ChainInfo +} + +func NewMultiNodeAdapter[RPC any, HEAD Head]( + cfg *mnCfg.MultiNodeConfig, rpc *RPC, ctxTimeout time.Duration, log logger.Logger, + latestBlock func(ctx context.Context, rpc *RPC) (HEAD, error), + latestFinalizedBlock func(ctx context.Context, rpc *RPC) (HEAD, error), +) (*MultiNodeAdapter[RPC, HEAD], error) { + return &MultiNodeAdapter[RPC, HEAD]{ + cfg: cfg, + rpc: rpc, + log: log, + ctxTimeout: ctxTimeout, + latestBlock: latestBlock, + latestFinalizedBlock: latestFinalizedBlock, + subs: make(map[Subscription]struct{}), + chStopInFlight: make(chan struct{}), + }, nil +} + +func (m *MultiNodeAdapter[RPC, HEAD]) LenSubs() int { + m.subsSliceMu.RLock() + defer m.subsSliceMu.RUnlock() + return len(m.subs) +} + +// registerSub adds the sub to the rpcMultiNodeAdapter list +func (m *MultiNodeAdapter[RPC, HEAD]) registerSub(sub Subscription, stopInFLightCh chan struct{}) error { + m.subsSliceMu.Lock() + defer m.subsSliceMu.Unlock() + // ensure that the `sub` belongs to current life cycle of the `rpcMultiNodeAdapter` and it should not be killed due to + // previous `DisconnectAll` call. + select { + case <-stopInFLightCh: + sub.Unsubscribe() + return fmt.Errorf("failed to register subscription - all in-flight requests were canceled") + default: + } + // TODO: BCI-3358 - delete sub when caller unsubscribes. + m.subs[sub] = struct{}{} + return nil +} + +func (m *MultiNodeAdapter[RPC, HEAD]) LatestBlock(ctx context.Context) (HEAD, error) { + // capture chStopInFlight to ensure we are not updating chainInfo with observations related to previous life cycle + ctx, cancel, chStopInFlight, rpc := m.AcquireQueryCtx(ctx, m.ctxTimeout) + defer cancel() + + head, err := m.latestBlock(ctx, rpc) + if err != nil { + return head, err + } + + if !head.IsValid() { + return head, errors.New("invalid head") + } + + m.onNewHead(ctx, chStopInFlight, head) + return head, nil +} + +func (m *MultiNodeAdapter[RPC, HEAD]) LatestFinalizedBlock(ctx context.Context) (HEAD, error) { + ctx, cancel, chStopInFlight, rpc := m.AcquireQueryCtx(ctx, m.ctxTimeout) + defer cancel() + + head, err := m.latestFinalizedBlock(ctx, rpc) + if err != nil { + return head, err + } + + if !head.IsValid() { + return head, errors.New("invalid head") + } + + m.OnNewFinalizedHead(ctx, chStopInFlight, head) + return head, nil +} + +func (m *MultiNodeAdapter[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-chan HEAD, Subscription, error) { + ctx, cancel, chStopInFlight, _ := m.AcquireQueryCtx(ctx, m.ctxTimeout) + defer cancel() + + // TODO: BCFR-1070 - Add BlockPollInterval + pollInterval := m.cfg.FinalizedBlockPollInterval() // Use same interval as finalized polling + if pollInterval == 0 { + return nil, nil, errors.New("PollInterval is 0") + } + timeout := pollInterval + poller, channel := NewPoller[HEAD](pollInterval, func(pollRequestCtx context.Context) (HEAD, error) { + if CtxIsHeathCheckRequest(ctx) { + pollRequestCtx = CtxAddHealthCheckFlag(pollRequestCtx) + } + return m.LatestBlock(pollRequestCtx) + }, timeout, m.log) + + if err := poller.Start(ctx); err != nil { + return nil, nil, err + } + + err := m.registerSub(&poller, chStopInFlight) + if err != nil { + poller.Unsubscribe() + return nil, nil, err + } + + return channel, &poller, nil +} + +func (m *MultiNodeAdapter[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error) { + ctx, cancel, chStopInFlight, _ := m.AcquireQueryCtx(ctx, m.ctxTimeout) + defer cancel() + + finalizedBlockPollInterval := m.cfg.FinalizedBlockPollInterval() + if finalizedBlockPollInterval == 0 { + return nil, nil, errors.New("FinalizedBlockPollInterval is 0") + } + timeout := finalizedBlockPollInterval + poller, channel := NewPoller[HEAD](finalizedBlockPollInterval, func(pollRequestCtx context.Context) (HEAD, error) { + if CtxIsHeathCheckRequest(ctx) { + pollRequestCtx = CtxAddHealthCheckFlag(pollRequestCtx) + } + return m.LatestFinalizedBlock(pollRequestCtx) + }, timeout, m.log) + if err := poller.Start(ctx); err != nil { + return nil, nil, err + } + + err := m.registerSub(&poller, chStopInFlight) + if err != nil { + poller.Unsubscribe() + return nil, nil, err + } + + return channel, &poller, nil +} + +func (m *MultiNodeAdapter[RPC, HEAD]) onNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) { + if !head.IsValid() { + return + } + + m.chainInfoLock.Lock() + defer m.chainInfoLock.Unlock() + if !CtxIsHeathCheckRequest(ctx) { + m.highestUserObservations.BlockNumber = max(m.highestUserObservations.BlockNumber, head.BlockNumber()) + } + select { + case <-requestCh: // no need to update latestChainInfo, as rpcMultiNodeAdapter already started new life cycle + return + default: + m.latestChainInfo.BlockNumber = head.BlockNumber() + } +} + +func (m *MultiNodeAdapter[RPC, HEAD]) OnNewFinalizedHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) { + if !head.IsValid() { + return + } + + m.chainInfoLock.Lock() + defer m.chainInfoLock.Unlock() + if !CtxIsHeathCheckRequest(ctx) { + m.highestUserObservations.FinalizedBlockNumber = max(m.highestUserObservations.FinalizedBlockNumber, head.BlockNumber()) + } + select { + case <-requestCh: // no need to update latestChainInfo, as rpcMultiNodeAdapter already started new life cycle + return + default: + m.latestChainInfo.FinalizedBlockNumber = head.BlockNumber() + } +} + +// MakeQueryCtx returns a context that cancels if: +// 1. Passed in ctx cancels +// 2. Passed in channel is closed +// 3. Default timeout is reached (queryTimeout) +func MakeQueryCtx(ctx context.Context, ch services.StopChan, timeout time.Duration) (context.Context, context.CancelFunc) { + var chCancel, timeoutCancel context.CancelFunc + ctx, chCancel = ch.Ctx(ctx) + ctx, timeoutCancel = context.WithTimeout(ctx, timeout) + cancel := func() { + chCancel() + timeoutCancel() + } + return ctx, cancel +} + +func (m *MultiNodeAdapter[RPC, HEAD]) AcquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, + chStopInFlight chan struct{}, raw *RPC) { + // Need to wrap in mutex because state transition can cancel and replace context + m.stateMu.RLock() + chStopInFlight = m.chStopInFlight + cp := *m.rpc + raw = &cp + m.stateMu.RUnlock() + ctx, cancel = MakeQueryCtx(parentCtx, chStopInFlight, timeout) + return +} + +func (m *MultiNodeAdapter[RPC, HEAD]) UnsubscribeAllExcept(subs ...Subscription) { + m.subsSliceMu.Lock() + defer m.subsSliceMu.Unlock() + + keepSubs := map[Subscription]struct{}{} + for _, sub := range subs { + keepSubs[sub] = struct{}{} + } + + for sub := range m.subs { + if _, keep := keepSubs[sub]; !keep { + sub.Unsubscribe() + delete(m.subs, sub) + } + } +} + +// cancelInflightRequests closes and replaces the chStopInFlight +func (m *MultiNodeAdapter[RPC, HEAD]) cancelInflightRequests() { + m.stateMu.Lock() + defer m.stateMu.Unlock() + close(m.chStopInFlight) + m.chStopInFlight = make(chan struct{}) +} + +func (m *MultiNodeAdapter[RPC, HEAD]) Close() { + m.cancelInflightRequests() + m.UnsubscribeAllExcept() + m.chainInfoLock.Lock() + m.latestChainInfo = ChainInfo{} + m.chainInfoLock.Unlock() +} + +func (m *MultiNodeAdapter[RPC, HEAD]) GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo) { + m.chainInfoLock.Lock() + defer m.chainInfoLock.Unlock() + return m.latestChainInfo, m.highestUserObservations +} diff --git a/pkg/solana/client/multinode/adaptor_test.go b/pkg/solana/client/multinode/adaptor_test.go new file mode 100644 index 000000000..3c47ecab7 --- /dev/null +++ b/pkg/solana/client/multinode/adaptor_test.go @@ -0,0 +1,174 @@ +package client + +import ( + "context" + "math/big" + "testing" + "time" + + "github.com/stretchr/testify/require" + + common "github.com/smartcontractkit/chainlink-common/pkg/config" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + + "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode/config" +) + +type testRPC struct { + latestBlock int64 +} + +type testHead struct { + blockNumber int64 +} + +func (t *testHead) BlockNumber() int64 { return t.blockNumber } +func (t *testHead) BlockDifficulty() *big.Int { return nil } +func (t *testHead) IsValid() bool { return true } + +func LatestBlock(ctx context.Context, rpc *testRPC) (*testHead, error) { + rpc.latestBlock++ + return &testHead{rpc.latestBlock}, nil +} + +func ptr[T any](t T) *T { + return &t +} + +func newTestClient(t *testing.T) *MultiNodeAdapter[testRPC, *testHead] { + requestTimeout := 5 * time.Second + lggr := logger.Test(t) + cfg := &config.MultiNodeConfig{ + MultiNode: config.MultiNode{ + Enabled: ptr(true), + PollFailureThreshold: ptr(uint32(5)), + PollInterval: common.MustNewDuration(15 * time.Second), + SelectionMode: ptr(NodeSelectionModePriorityLevel), + SyncThreshold: ptr(uint32(10)), + LeaseDuration: common.MustNewDuration(time.Minute), + NodeIsSyncingEnabled: ptr(false), + FinalizedBlockPollInterval: common.MustNewDuration(5 * time.Second), + EnforceRepeatableRead: ptr(true), + DeathDeclarationDelay: common.MustNewDuration(20 * time.Second), + NodeNoNewHeadsThreshold: common.MustNewDuration(20 * time.Second), + NoNewFinalizedHeadsThreshold: common.MustNewDuration(20 * time.Second), + FinalityTagEnabled: ptr(true), + FinalityDepth: ptr(uint32(0)), + FinalizedBlockOffset: ptr(uint32(50)), + }, + } + c, err := NewMultiNodeAdapter[testRPC, *testHead](cfg, &testRPC{}, requestTimeout, lggr, LatestBlock, LatestBlock) + require.NoError(t, err) + t.Cleanup(c.Close) + return c +} + +func TestMultiNodeClient_LatestBlock(t *testing.T) { + t.Run("LatestBlock", func(t *testing.T) { + c := newTestClient(t) + head, err := c.LatestBlock(tests.Context(t)) + require.NoError(t, err) + require.Equal(t, true, head.IsValid()) + }) + + t.Run("LatestFinalizedBlock", func(t *testing.T) { + c := newTestClient(t) + finalizedHead, err := c.LatestFinalizedBlock(tests.Context(t)) + require.NoError(t, err) + require.Equal(t, true, finalizedHead.IsValid()) + }) +} + +func TestMultiNodeClient_HeadSubscriptions(t *testing.T) { + t.Run("SubscribeToHeads", func(t *testing.T) { + c := newTestClient(t) + ch, sub, err := c.SubscribeToHeads(tests.Context(t)) + require.NoError(t, err) + defer sub.Unsubscribe() + + ctx, cancel := context.WithTimeout(tests.Context(t), time.Minute) + defer cancel() + select { + case head := <-ch: + latest, _ := c.GetInterceptedChainInfo() + require.Equal(t, head.BlockNumber(), latest.BlockNumber) + case <-ctx.Done(): + t.Fatal("failed to receive head: ", ctx.Err()) + } + }) + + t.Run("SubscribeToFinalizedHeads", func(t *testing.T) { + c := newTestClient(t) + finalizedCh, finalizedSub, err := c.SubscribeToFinalizedHeads(tests.Context(t)) + require.NoError(t, err) + defer finalizedSub.Unsubscribe() + + ctx, cancel := context.WithTimeout(tests.Context(t), time.Minute) + defer cancel() + select { + case finalizedHead := <-finalizedCh: + latest, _ := c.GetInterceptedChainInfo() + require.Equal(t, finalizedHead.BlockNumber(), latest.FinalizedBlockNumber) + case <-ctx.Done(): + t.Fatal("failed to receive finalized head: ", ctx.Err()) + } + }) +} + +type mockSub struct { + unsubscribed bool +} + +func newMockSub() *mockSub { + return &mockSub{unsubscribed: false} +} + +func (s *mockSub) Unsubscribe() { + s.unsubscribed = true +} +func (s *mockSub) Err() <-chan error { + return nil +} + +func TestMultiNodeClient_RegisterSubs(t *testing.T) { + t.Run("registerSub", func(t *testing.T) { + c := newTestClient(t) + sub := newMockSub() + err := c.registerSub(sub, make(chan struct{})) + require.NoError(t, err) + require.Equal(t, 1, c.LenSubs()) + c.UnsubscribeAllExcept() + }) + + t.Run("chStopInFlight returns error and unsubscribes", func(t *testing.T) { + c := newTestClient(t) + chStopInFlight := make(chan struct{}) + close(chStopInFlight) + sub := newMockSub() + err := c.registerSub(sub, chStopInFlight) + require.Error(t, err) + require.Equal(t, true, sub.unsubscribed) + }) + + t.Run("UnsubscribeAllExcept", func(t *testing.T) { + c := newTestClient(t) + chStopInFlight := make(chan struct{}) + sub1 := newMockSub() + sub2 := newMockSub() + err := c.registerSub(sub1, chStopInFlight) + require.NoError(t, err) + err = c.registerSub(sub2, chStopInFlight) + require.NoError(t, err) + require.Equal(t, 2, c.LenSubs()) + + c.UnsubscribeAllExcept(sub1) + require.Equal(t, 1, c.LenSubs()) + require.Equal(t, true, sub2.unsubscribed) + require.Equal(t, false, sub1.unsubscribed) + + c.UnsubscribeAllExcept() + require.Equal(t, 0, c.LenSubs()) + require.Equal(t, true, sub1.unsubscribed) + }) +} diff --git a/pkg/solana/config/multinode.go b/pkg/solana/client/multinode/config/config.go similarity index 57% rename from pkg/solana/config/multinode.go rename to pkg/solana/client/multinode/config/config.go index d002d489e..ba879553e 100644 --- a/pkg/solana/config/multinode.go +++ b/pkg/solana/client/multinode/config/config.go @@ -4,8 +4,6 @@ import ( "time" "github.com/smartcontractkit/chainlink-common/pkg/config" - - mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode" ) // MultiNodeConfig is a wrapper to provide required functions while keeping configs Public @@ -86,73 +84,6 @@ func (c *MultiNodeConfig) FinalityTagEnabled() bool { return *c.MultiNode.Finali func (c *MultiNodeConfig) FinalizedBlockOffset() uint32 { return *c.MultiNode.FinalizedBlockOffset } -func (c *MultiNodeConfig) SetDefaults() { - // MultiNode is disabled as it's not fully implemented yet: BCFR-122 - if c.MultiNode.Enabled == nil { - c.MultiNode.Enabled = ptr(false) - } - - /* Node Configs */ - // Failure threshold for polling set to 5 to tolerate some polling failures before taking action. - if c.MultiNode.PollFailureThreshold == nil { - c.MultiNode.PollFailureThreshold = ptr(uint32(5)) - } - // Poll interval is set to 15 seconds to ensure timely updates while minimizing resource usage. - if c.MultiNode.PollInterval == nil { - c.MultiNode.PollInterval = config.MustNewDuration(15 * time.Second) - } - // Selection mode defaults to priority level to enable using node priorities - if c.MultiNode.SelectionMode == nil { - c.MultiNode.SelectionMode = ptr(mn.NodeSelectionModePriorityLevel) - } - // The sync threshold is set to 10 to allow for some flexibility in node synchronization before considering it out of sync. - if c.MultiNode.SyncThreshold == nil { - c.MultiNode.SyncThreshold = ptr(uint32(10)) - } - // Lease duration is set to 1 minute by default to allow node locks for a reasonable amount of time. - if c.MultiNode.LeaseDuration == nil { - c.MultiNode.LeaseDuration = config.MustNewDuration(time.Minute) - } - // Node syncing is not relevant for Solana and is disabled by default. - if c.MultiNode.NodeIsSyncingEnabled == nil { - c.MultiNode.NodeIsSyncingEnabled = ptr(false) - } - // The finalized block polling interval is set to 5 seconds to ensure timely updates while minimizing resource usage. - if c.MultiNode.FinalizedBlockPollInterval == nil { - c.MultiNode.FinalizedBlockPollInterval = config.MustNewDuration(5 * time.Second) - } - // Repeatable read guarantee should be enforced by default. - if c.MultiNode.EnforceRepeatableRead == nil { - c.MultiNode.EnforceRepeatableRead = ptr(true) - } - // The delay before declaring a node dead is set to 20 seconds to give nodes time to recover from temporary issues. - if c.MultiNode.DeathDeclarationDelay == nil { - c.MultiNode.DeathDeclarationDelay = config.MustNewDuration(20 * time.Second) - } - - /* Chain Configs */ - // Threshold for no new heads is set to 20 seconds, assuming that heads should update at a reasonable pace. - if c.MultiNode.NodeNoNewHeadsThreshold == nil { - c.MultiNode.NodeNoNewHeadsThreshold = config.MustNewDuration(20 * time.Second) - } - // Similar to heads, finalized heads should be updated within 20 seconds. - if c.MultiNode.NoNewFinalizedHeadsThreshold == nil { - c.MultiNode.NoNewFinalizedHeadsThreshold = config.MustNewDuration(20 * time.Second) - } - // Finality tags are used in Solana and enabled by default. - if c.MultiNode.FinalityTagEnabled == nil { - c.MultiNode.FinalityTagEnabled = ptr(true) - } - // Finality depth will not be used since finality tags are enabled. - if c.MultiNode.FinalityDepth == nil { - c.MultiNode.FinalityDepth = ptr(uint32(0)) - } - // Finalized block offset allows for RPCs to be slightly behind the finalized block. - if c.MultiNode.FinalizedBlockOffset == nil { - c.MultiNode.FinalizedBlockOffset = ptr(uint32(50)) - } -} - func (c *MultiNodeConfig) SetFrom(f *MultiNodeConfig) { if f.MultiNode.Enabled != nil { c.MultiNode.Enabled = f.MultiNode.Enabled diff --git a/pkg/solana/client/multinode_client.go b/pkg/solana/client/multinode_client.go index e6a70de9c..610ef20c8 100644 --- a/pkg/solana/client/multinode_client.go +++ b/pkg/solana/client/multinode_client.go @@ -2,16 +2,14 @@ package client import ( "context" - "errors" "fmt" "math/big" - "sync" "time" "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" + "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/chainlink-common/pkg/services" mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode" "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" @@ -44,22 +42,9 @@ var _ mn.RPCClient[mn.StringID, *Head] = (*MultiNodeClient)(nil) var _ mn.SendTxRPCClient[*solana.Transaction, *SendTxResult] = (*MultiNodeClient)(nil) type MultiNodeClient struct { - Client - cfg *config.TOMLConfig - stateMu sync.RWMutex // protects state* fields - subsSliceMu sync.RWMutex - subs map[mn.Subscription]struct{} - - // chStopInFlight can be closed to immediately cancel all in-flight requests on - // this RpcClient. Closing and replacing should be serialized through - // stateMu since it can happen on state transitions as well as RpcClient Close. - chStopInFlight chan struct{} - - chainInfoLock sync.RWMutex - // intercepted values seen by callers of the rpcClient excluding health check calls. Need to ensure MultiNode provides repeatable read guarantee - highestUserObservations mn.ChainInfo - // most recent chain info observed during current lifecycle (reseted on DisconnectAll) - latestChainInfo mn.ChainInfo + *Client + *mn.MultiNodeAdapter[rpc.Client, *Head] + cfg *config.TOMLConfig } func NewMultiNodeClient(endpoint string, cfg *config.TOMLConfig, requestTimeout time.Duration, log logger.Logger) (*MultiNodeClient, error) { @@ -67,206 +52,51 @@ func NewMultiNodeClient(endpoint string, cfg *config.TOMLConfig, requestTimeout if err != nil { return nil, err } - + multiNodeClient, err := mn.NewMultiNodeAdapter[rpc.Client, *Head]( + &cfg.MultiNode, client.rpc, client.contextDuration, client.log, LatestBlock, LatestFinalizedBlock) + if err != nil { + return nil, err + } return &MultiNodeClient{ - Client: *client, - cfg: cfg, - subs: make(map[mn.Subscription]struct{}), - chStopInFlight: make(chan struct{}), + Client: client, + MultiNodeAdapter: multiNodeClient, + cfg: cfg, }, nil } -// registerSub adds the sub to the rpcClient list -func (m *MultiNodeClient) registerSub(sub mn.Subscription, stopInFLightCh chan struct{}) error { - m.subsSliceMu.Lock() - defer m.subsSliceMu.Unlock() - // ensure that the `sub` belongs to current life cycle of the `rpcClient` and it should not be killed due to - // previous `DisconnectAll` call. - select { - case <-stopInFLightCh: - sub.Unsubscribe() - return fmt.Errorf("failed to register subscription - all in-flight requests were canceled") - default: - } - // TODO: BCI-3358 - delete sub when caller unsubscribes. - m.subs[sub] = struct{}{} - return nil -} - func (m *MultiNodeClient) Dial(ctx context.Context) error { - // Not relevant for Solana as the RPCs don't need to be dialled. + // Not relevant for Solana as the RPCs don't need to be dialled.m return nil } -func (m *MultiNodeClient) SubscribeToHeads(ctx context.Context) (<-chan *Head, mn.Subscription, error) { - ctx, cancel, chStopInFlight, _ := m.acquireQueryCtx(ctx, m.cfg.TxTimeout()) - defer cancel() - - // TODO: BCFR-1070 - Add BlockPollInterval - pollInterval := m.cfg.MultiNode.FinalizedBlockPollInterval() // Use same interval as finalized polling - if pollInterval == 0 { - return nil, nil, errors.New("PollInterval is 0") - } - timeout := pollInterval - poller, channel := mn.NewPoller[*Head](pollInterval, func(pollRequestCtx context.Context) (*Head, error) { - if mn.CtxIsHeathCheckRequest(ctx) { - pollRequestCtx = mn.CtxAddHealthCheckFlag(pollRequestCtx) - } - return m.LatestBlock(pollRequestCtx) - }, timeout, m.log) - - if err := poller.Start(ctx); err != nil { - return nil, nil, err - } - - err := m.registerSub(&poller, chStopInFlight) - if err != nil { - poller.Unsubscribe() - return nil, nil, err - } - - return channel, &poller, nil -} - -func (m *MultiNodeClient) SubscribeToFinalizedHeads(ctx context.Context) (<-chan *Head, mn.Subscription, error) { - ctx, cancel, chStopInFlight, _ := m.acquireQueryCtx(ctx, m.contextDuration) - defer cancel() - - finalizedBlockPollInterval := m.cfg.MultiNode.FinalizedBlockPollInterval() - if finalizedBlockPollInterval == 0 { - return nil, nil, errors.New("FinalizedBlockPollInterval is 0") - } - timeout := finalizedBlockPollInterval - poller, channel := mn.NewPoller[*Head](finalizedBlockPollInterval, func(pollRequestCtx context.Context) (*Head, error) { - if mn.CtxIsHeathCheckRequest(ctx) { - pollRequestCtx = mn.CtxAddHealthCheckFlag(pollRequestCtx) - } - return m.LatestFinalizedBlock(pollRequestCtx) - }, timeout, m.log) - if err := poller.Start(ctx); err != nil { - return nil, nil, err - } - - err := m.registerSub(&poller, chStopInFlight) - if err != nil { - poller.Unsubscribe() - return nil, nil, err - } - - return channel, &poller, nil -} - -func (m *MultiNodeClient) LatestBlock(ctx context.Context) (*Head, error) { - // capture chStopInFlight to ensure we are not updating chainInfo with observations related to previous life cycle - ctx, cancel, chStopInFlight, rawRPC := m.acquireQueryCtx(ctx, m.contextDuration) - defer cancel() - +func LatestBlock(ctx context.Context, rawRPC *rpc.Client) (*Head, error) { result, err := rawRPC.GetLatestBlockhash(ctx, rpc.CommitmentConfirmed) if err != nil { return nil, err } - - head := &Head{ + return &Head{ BlockHeight: &result.Value.LastValidBlockHeight, BlockHash: &result.Value.Blockhash, - } - if !head.IsValid() { - return nil, errors.New("invalid head") - } - - m.onNewHead(ctx, chStopInFlight, head) - return head, nil + }, nil } -func (m *MultiNodeClient) LatestFinalizedBlock(ctx context.Context) (*Head, error) { - ctx, cancel, chStopInFlight, rawRPC := m.acquireQueryCtx(ctx, m.contextDuration) - defer cancel() - +func LatestFinalizedBlock(ctx context.Context, rawRPC *rpc.Client) (*Head, error) { result, err := rawRPC.GetLatestBlockhash(ctx, rpc.CommitmentFinalized) if err != nil { return nil, err } - - head := &Head{ + return &Head{ BlockHeight: &result.Value.LastValidBlockHeight, BlockHash: &result.Value.Blockhash, - } - if !head.IsValid() { - return nil, errors.New("invalid head") - } - - m.onNewFinalizedHead(ctx, chStopInFlight, head) - return head, nil -} - -func (m *MultiNodeClient) onNewHead(ctx context.Context, requestCh <-chan struct{}, head *Head) { - if head == nil { - return - } - - m.chainInfoLock.Lock() - defer m.chainInfoLock.Unlock() - if !mn.CtxIsHeathCheckRequest(ctx) { - m.highestUserObservations.BlockNumber = max(m.highestUserObservations.BlockNumber, head.BlockNumber()) - } - select { - case <-requestCh: // no need to update latestChainInfo, as rpcClient already started new life cycle - return - default: - m.latestChainInfo.BlockNumber = head.BlockNumber() - } -} - -func (m *MultiNodeClient) onNewFinalizedHead(ctx context.Context, requestCh <-chan struct{}, head *Head) { - if head == nil { - return - } - m.chainInfoLock.Lock() - defer m.chainInfoLock.Unlock() - if !mn.CtxIsHeathCheckRequest(ctx) { - m.highestUserObservations.FinalizedBlockNumber = max(m.highestUserObservations.FinalizedBlockNumber, head.BlockNumber()) - } - select { - case <-requestCh: // no need to update latestChainInfo, as rpcClient already started new life cycle - return - default: - m.latestChainInfo.FinalizedBlockNumber = head.BlockNumber() - } -} - -// makeQueryCtx returns a context that cancels if: -// 1. Passed in ctx cancels -// 2. Passed in channel is closed -// 3. Default timeout is reached (queryTimeout) -func makeQueryCtx(ctx context.Context, ch services.StopChan, timeout time.Duration) (context.Context, context.CancelFunc) { - var chCancel, timeoutCancel context.CancelFunc - ctx, chCancel = ch.Ctx(ctx) - ctx, timeoutCancel = context.WithTimeout(ctx, timeout) - cancel := func() { - chCancel() - timeoutCancel() - } - return ctx, cancel -} - -func (m *MultiNodeClient) acquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, - chStopInFlight chan struct{}, raw *rpc.Client) { - // Need to wrap in mutex because state transition can cancel and replace context - m.stateMu.RLock() - chStopInFlight = m.chStopInFlight - cp := *m.rpc - raw = &cp - m.stateMu.RUnlock() - ctx, cancel = makeQueryCtx(parentCtx, chStopInFlight, timeout) - return + }, nil } func (m *MultiNodeClient) Ping(ctx context.Context) error { - version, err := m.rpc.GetVersion(ctx) + version, err := m.Client.rpc.GetVersion(ctx) if err != nil { return fmt.Errorf("ping failed: %v", err) } - m.log.Debugf("ping client version: %s", version.SolanaCore) + m.Client.log.Debugf("ping client version: %s", version.SolanaCore) return err } @@ -275,49 +105,14 @@ func (m *MultiNodeClient) IsSyncing(ctx context.Context) (bool, error) { return false, nil } -func (m *MultiNodeClient) UnsubscribeAllExcept(subs ...mn.Subscription) { - m.subsSliceMu.Lock() - defer m.subsSliceMu.Unlock() - - keepSubs := map[mn.Subscription]struct{}{} - for _, sub := range subs { - keepSubs[sub] = struct{}{} - } - - for sub := range m.subs { - if _, keep := keepSubs[sub]; !keep { - sub.Unsubscribe() - delete(m.subs, sub) - } - } -} - -// cancelInflightRequests closes and replaces the chStopInFlight -func (m *MultiNodeClient) cancelInflightRequests() { - m.stateMu.Lock() - defer m.stateMu.Unlock() - close(m.chStopInFlight) - m.chStopInFlight = make(chan struct{}) -} - func (m *MultiNodeClient) Close() { defer func() { - err := m.rpc.Close() + err := m.Client.rpc.Close() if err != nil { - m.log.Errorf("error closing rpc: %v", err) + m.Client.log.Errorf("error closing rpc: %v", err) } }() - m.cancelInflightRequests() - m.UnsubscribeAllExcept() - m.chainInfoLock.Lock() - m.latestChainInfo = mn.ChainInfo{} - m.chainInfoLock.Unlock() -} - -func (m *MultiNodeClient) GetInterceptedChainInfo() (latest, highestUserObservations mn.ChainInfo) { - m.chainInfoLock.Lock() - defer m.chainInfoLock.Unlock() - return m.latestChainInfo, m.highestUserObservations + m.MultiNodeAdapter.Close() } type SendTxResult struct { diff --git a/pkg/solana/client/multinode_client_test.go b/pkg/solana/client/multinode_client_test.go index 04339b5be..e01123d2f 100644 --- a/pkg/solana/client/multinode_client_test.go +++ b/pkg/solana/client/multinode_client_test.go @@ -91,58 +91,3 @@ func TestMultiNodeClient_HeadSubscriptions(t *testing.T) { } }) } - -type mockSub struct { - unsubscribed bool -} - -func newMockSub() *mockSub { - return &mockSub{unsubscribed: false} -} - -func (s *mockSub) Unsubscribe() { - s.unsubscribed = true -} -func (s *mockSub) Err() <-chan error { - return nil -} - -func TestMultiNodeClient_RegisterSubs(t *testing.T) { - c := initializeMultiNodeClient(t) - - t.Run("registerSub", func(t *testing.T) { - sub := newMockSub() - err := c.registerSub(sub, make(chan struct{})) - require.NoError(t, err) - require.Len(t, c.subs, 1) - c.UnsubscribeAllExcept() - }) - - t.Run("chStopInFlight returns error and unsubscribes", func(t *testing.T) { - chStopInFlight := make(chan struct{}) - close(chStopInFlight) - sub := newMockSub() - err := c.registerSub(sub, chStopInFlight) - require.Error(t, err) - require.Equal(t, true, sub.unsubscribed) - }) - - t.Run("UnsubscribeAllExcept", func(t *testing.T) { - chStopInFlight := make(chan struct{}) - sub1 := newMockSub() - sub2 := newMockSub() - err := c.registerSub(sub1, chStopInFlight) - require.NoError(t, err) - err = c.registerSub(sub2, chStopInFlight) - require.NoError(t, err) - require.Len(t, c.subs, 2) - - c.UnsubscribeAllExcept(sub1) - require.Len(t, c.subs, 1) - require.Equal(t, true, sub2.unsubscribed) - - c.UnsubscribeAllExcept() - require.Len(t, c.subs, 0) - require.Equal(t, true, sub1.unsubscribed) - }) -} diff --git a/pkg/solana/config/toml.go b/pkg/solana/config/toml.go index 9d46c0b65..538d9c2c7 100644 --- a/pkg/solana/config/toml.go +++ b/pkg/solana/config/toml.go @@ -12,6 +12,9 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/config" relaytypes "github.com/smartcontractkit/chainlink-common/pkg/types" + + mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode" + mnCfg "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode/config" ) type TOMLConfigs []*TOMLConfig @@ -113,7 +116,7 @@ type TOMLConfig struct { // Do not access directly, use [IsEnabled] Enabled *bool Chain - MultiNode MultiNodeConfig + MultiNode mnCfg.MultiNodeConfig Nodes Nodes } @@ -312,12 +315,55 @@ func (c *TOMLConfig) ListNodes() Nodes { func (c *TOMLConfig) SetDefaults() { c.Chain.SetDefaults() - c.MultiNode.SetDefaults() + c.MultiNode.SetFrom(defaultMultiNodeConfig) } func NewDefault() *TOMLConfig { cfg := &TOMLConfig{} cfg.Chain.SetDefaults() - cfg.MultiNode.SetDefaults() + cfg.MultiNode.SetFrom(defaultMultiNodeConfig) + return cfg +} + +var defaultMultiNodeConfig = &mnCfg.MultiNodeConfig{ + MultiNode: mnCfg.MultiNode{ + // Have multinode disabled by default + Enabled: ptr(false), + /* Node Configs */ + // Failure threshold for polling set to 5 to tolerate some polling failures before taking action. + PollFailureThreshold: ptr(uint32(5)), + // Poll interval is set to 15 seconds to ensure timely updates while minimizing resource usage. + PollInterval: config.MustNewDuration(15 * time.Second), + // Selection mode defaults to priority level to enable using node priorities + SelectionMode: ptr(mn.NodeSelectionModePriorityLevel), + // The sync threshold is set to 10 to allow for some flexibility in node synchronization before considering it out of sync. + SyncThreshold: ptr(uint32(10)), + // Lease duration is set to 1 minute by default to allow node locks for a reasonable amount of time. + LeaseDuration: config.MustNewDuration(time.Minute), + // Node syncing is not relevant for Solana and is disabled by default. + NodeIsSyncingEnabled: ptr(false), + // The finalized block polling interval is set to 5 seconds to ensure timely updates while minimizing resource usage. + FinalizedBlockPollInterval: config.MustNewDuration(5 * time.Second), + // Repeatable read guarantee should be enforced by default. + EnforceRepeatableRead: ptr(true), + // The delay before declaring a node dead is set to 20 seconds to give nodes time to recover from temporary issues. + DeathDeclarationDelay: config.MustNewDuration(20 * time.Second), + /* Chain Configs */ + // Threshold for no new heads is set to 20 seconds, assuming that heads should update at a reasonable pace. + NodeNoNewHeadsThreshold: config.MustNewDuration(20 * time.Second), + // Similar to heads, finalized heads should be updated within 20 seconds. + NoNewFinalizedHeadsThreshold: config.MustNewDuration(20 * time.Second), + // Finality tags are used in Solana and enabled by default. + FinalityTagEnabled: ptr(true), + // Finality depth will not be used since finality tags are enabled. + FinalityDepth: ptr(uint32(0)), + // Finalized block offset allows for RPCs to be slightly behind the finalized block. + FinalizedBlockOffset: ptr(uint32(50)), + }, +} + +func NewDefaultMultiNodeConfig() mnCfg.MultiNodeConfig { + cfg := mnCfg.MultiNodeConfig{} + cfg.SetFrom(defaultMultiNodeConfig) return cfg }