Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update MultiNode with latest changes #15058

Merged
merged 14 commits into from
Nov 20, 2024
5 changes: 5 additions & 0 deletions .changeset/silver-avocados-buy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Update MultiNode with latest changes and bug fixes. Fixes an issue that caused nodes to go OutOfSync incorrectly, and also fixed context handling for sending transactions. #internal #bugfix
2 changes: 0 additions & 2 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ func (c *MultiNode[CHAIN_ID, RPC]) ChainID() CHAIN_ID {
func (c *MultiNode[CHAIN_ID, RPC]) DoAll(ctx context.Context, do func(ctx context.Context, rpc RPC, isSendOnly bool)) error {
var err error
ok := c.IfNotStopped(func() {
ctx, _ = c.chStop.Ctx(ctx)

callsCompleted := 0
for _, n := range c.primaryNodes {
select {
Expand Down
39 changes: 24 additions & 15 deletions common/client/node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"math/big"
"time"

"github.com/smartcontractkit/chainlink/v2/common/types"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

Expand All @@ -17,6 +15,7 @@ import (
bigmath "github.com/smartcontractkit/chainlink-common/pkg/utils/big_math"

iutils "github.com/smartcontractkit/chainlink/v2/common/internal/utils"
"github.com/smartcontractkit/chainlink/v2/common/types"
)

var (
Expand Down Expand Up @@ -132,6 +131,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
}
}

// Get the latest chain info to use as local highest
localHighestChainInfo, _ := n.rpc.GetInterceptedChainInfo()
var pollFailures uint32

Expand Down Expand Up @@ -168,10 +168,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
n.declareUnreachable()
return
}
_, latestChainInfo := n.StateAndLatest()
if outOfSync, liveNodes := n.isOutOfSyncWithPool(latestChainInfo); outOfSync {
if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync {
DylanTinianov marked this conversation as resolved.
Show resolved Hide resolved
// note: there must be another live node for us to be out of sync
lggr.Errorw("RPC endpoint has fallen behind", "blockNumber", latestChainInfo.BlockNumber, "totalDifficulty", latestChainInfo.TotalDifficulty, "nodeState", n.getCachedState())
if liveNodes < 2 {
lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState)
continue
Expand Down Expand Up @@ -310,9 +308,9 @@ func (n *node[CHAIN_ID, HEAD, RPC]) onNewFinalizedHead(lggr logger.SugaredLogger
}

latestFinalizedBN := latestFinalized.BlockNumber()
lggr.Tracew("Got latest finalized head", "latestFinalized", latestFinalized)
lggr.Debugw("Got latest finalized head", "latestFinalized", latestFinalized)
if latestFinalizedBN <= chainInfo.FinalizedBlockNumber {
lggr.Tracew("Ignoring previously seen finalized block number")
lggr.Debugw("Ignoring previously seen finalized block number")
return false
}

Expand All @@ -328,10 +326,10 @@ func (n *node[CHAIN_ID, HEAD, RPC]) onNewHead(lggr logger.SugaredLogger, chainIn
}

promPoolRPCNodeNumSeenBlocks.WithLabelValues(n.chainID.String(), n.name).Inc()
lggr.Tracew("Got head", "head", head)
lggr.Debugw("Got head", "head", head)
lggr = lggr.With("latestReceivedBlockNumber", chainInfo.BlockNumber, "blockNumber", head.BlockNumber(), "nodeState", n.getCachedState())
if head.BlockNumber() <= chainInfo.BlockNumber {
lggr.Tracew("Ignoring previously seen block number")
lggr.Debugw("Ignoring previously seen block number")
return false
}

Expand All @@ -358,7 +356,7 @@ const (
// isOutOfSyncWithPool returns outOfSync true if num or td is more than SyncThresold behind the best node.
// Always returns outOfSync false for SyncThreshold 0.
// liveNodes is only included when outOfSync is true.
func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool(localState ChainInfo) (outOfSync bool, liveNodes int) {
func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool() (outOfSync bool, liveNodes int) {
if n.poolInfoProvider == nil {
n.lfcLog.Warn("skipping sync state against the pool - should only occur in tests")
return // skip for tests
Expand All @@ -369,16 +367,22 @@ func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool(localState ChainInfo) (o
}
// Check against best node
ln, ci := n.poolInfoProvider.LatestChainInfo()
localChainInfo, _ := n.rpc.GetInterceptedChainInfo()
mode := n.nodePoolCfg.SelectionMode()
switch mode {
case NodeSelectionModeHighestHead, NodeSelectionModeRoundRobin, NodeSelectionModePriorityLevel:
return localState.BlockNumber < ci.BlockNumber-int64(threshold), ln
outOfSync = localChainInfo.BlockNumber < ci.BlockNumber-int64(threshold)
case NodeSelectionModeTotalDifficulty:
bigThreshold := big.NewInt(int64(threshold))
return localState.TotalDifficulty.Cmp(bigmath.Sub(ci.TotalDifficulty, bigThreshold)) < 0, ln
outOfSync = localChainInfo.TotalDifficulty.Cmp(bigmath.Sub(ci.TotalDifficulty, bigThreshold)) < 0
default:
panic("unrecognized NodeSelectionMode: " + mode)
}

if outOfSync && n.getCachedState() == nodeStateAlive {
n.lfcLog.Errorw("RPC endpoint has fallen behind", "blockNumber", localChainInfo.BlockNumber, "bestLatestBlockNumber", ci.BlockNumber, "totalDifficulty", localChainInfo.TotalDifficulty)
}
return outOfSync, ln
}

// outOfSyncLoop takes an OutOfSync node and waits until isOutOfSync returns false to go back to live status
Expand Down Expand Up @@ -464,7 +468,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) {

// received a new head - clear NoNewHead flag
syncIssues &= ^syncStatusNoNewHead
if outOfSync, _ := n.isOutOfSyncWithPool(localHighestChainInfo); !outOfSync {
if outOfSync, _ := n.isOutOfSyncWithPool(); !outOfSync {
// we caught up with the pool - clear NotInSyncWithPool flag
syncIssues &= ^syncStatusNotInSyncWithPool
} else {
Expand Down Expand Up @@ -515,7 +519,12 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) {
finalizedHeadsSub.ResetTimer(noNewFinalizedBlocksTimeoutThreshold)
}

lggr.Debugw(msgReceivedFinalizedBlock, "blockNumber", latestFinalized.BlockNumber(), "syncIssues", syncIssues)
var highestSeen ChainInfo
if n.poolInfoProvider != nil {
highestSeen = n.poolInfoProvider.HighestUserObservations()
}

lggr.Debugw(msgReceivedFinalizedBlock, "blockNumber", latestFinalized.BlockNumber(), "poolHighestBlockNumber", highestSeen.FinalizedBlockNumber, "syncIssues", syncIssues)
case err := <-finalizedHeadsSub.Errors:
lggr.Errorw("Finalized head subscription was terminated", "err", err)
n.declareUnreachable()
Expand Down Expand Up @@ -648,7 +657,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) syncingLoop() {
case nodeStateClosed:
return
default:
panic(fmt.Sprintf("syncingLoop can only run for node in nodeStateSyncing state, got: %s", state))
panic(fmt.Sprintf("syncingLoop can only run for node in NodeStateSyncing state, got: %s", state))
}
}

Expand Down
46 changes: 32 additions & 14 deletions common/client/node_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
poolInfo.On("LatestChainInfo").Return(10, ChainInfo{
BlockNumber: syncThreshold + mostRecentBlock + 1,
TotalDifficulty: big.NewInt(10),
}).Once()
})
node.SetPoolChainInfoProvider(poolInfo)
// tries to redial in outOfSync
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) {
Expand Down Expand Up @@ -259,7 +259,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
BlockNumber: syncThreshold + mostRecentBlock + 1,
TotalDifficulty: big.NewInt(10),
}).Once()
})
node.SetPoolChainInfoProvider(poolInfo)
node.declareAlive()
tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState))
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
t.Run("when no new heads received for threshold, transitions to out of sync", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[types.ID, Head](t)
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})
node := newSubscribedNode(t, testNodeOpts{
config: testNodeConfig{},
chainConfig: clientMocks.ChainConfig{
Expand All @@ -312,7 +312,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
t.Run("when no new heads received for threshold but we are the last live node, forcibly stays alive", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[types.ID, Head](t)
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
node := newSubscribedNode(t, testNodeOpts{
config: testNodeConfig{},
Expand Down Expand Up @@ -693,15 +693,25 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
t.Run("if fail to get chainID, transitions to unreachable", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[types.ID, Head](t)
chainID := types.RandomID()
node := newAliveNode(t, testNodeOpts{
rpc: rpc,
rpc: rpc,
chainID: chainID,
})
defer func() { assert.NoError(t, node.close()) }()

rpc.On("ChainID", mock.Anything).Return(chainID, nil)
// for out-of-sync
rpc.On("Dial", mock.Anything).Return(nil).Once()
// for unreachable
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe()
sub := mocks.NewSubscription(t)
errChan := make(chan error, 1)
errChan <- errors.New("subscription was terminate")
sub.On("Err").Return((<-chan error)(errChan))
sub.On("Unsubscribe").Once()
rpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan Head), sub, nil)
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})

expectedError := errors.New("failed to get chain ID")
// might be called multiple times
Expand Down Expand Up @@ -1025,7 +1035,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
sub.On("Err").Return((<-chan error)(errChan))
sub.On("Unsubscribe").Once()
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), sub, nil).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})

// unreachable
rpc.On("Dial", mock.Anything).Return(errors.New("failed to redial")).Maybe()
Expand Down Expand Up @@ -1056,7 +1066,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Run(func(args mock.Arguments) {
close(ch)
}).Return((<-chan Head)(ch), sub, nil).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})
// unreachable
rpc.On("Dial", mock.Anything).Return(errors.New("failed to redial")).Maybe()
node.declareOutOfSync(syncStatusNoNewHead)
Expand All @@ -1082,7 +1092,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
defer func() { assert.NoError(t, node.close()) }()

const highestBlock = 13
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{FinalizedBlockNumber: highestBlock}).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{FinalizedBlockNumber: highestBlock}, ChainInfo{FinalizedBlockNumber: highestBlock})

outOfSyncSubscription := mocks.NewSubscription(t)
outOfSyncSubscription.On("Err").Return((<-chan error)(nil))
Expand Down Expand Up @@ -1119,7 +1129,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
defer func() { assert.NoError(t, node.close()) }()

const highestBlock = 13
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{FinalizedBlockNumber: highestBlock}).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{FinalizedBlockNumber: highestBlock})

outOfSyncSubscription := mocks.NewSubscription(t)
outOfSyncSubscription.On("Err").Return((<-chan error)(nil))
Expand Down Expand Up @@ -1582,15 +1592,15 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) {
t.Parallel()
t.Run("skip if nLiveNodes is not configured", func(t *testing.T) {
node := newTestNode(t, testNodeOpts{})
outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{})
outOfSync, liveNodes := node.isOutOfSyncWithPool()
assert.Equal(t, false, outOfSync)
assert.Equal(t, 0, liveNodes)
})
t.Run("skip if syncThreshold is not configured", func(t *testing.T) {
node := newTestNode(t, testNodeOpts{})
poolInfo := newMockPoolChainInfoProvider(t)
node.SetPoolChainInfoProvider(poolInfo)
outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{})
outOfSync, liveNodes := node.isOutOfSyncWithPool()
assert.Equal(t, false, outOfSync)
assert.Equal(t, 0, liveNodes)
})
Expand All @@ -1602,7 +1612,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) {
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{}).Once()
node.SetPoolChainInfoProvider(poolInfo)
assert.Panics(t, func() {
_, _ = node.isOutOfSyncWithPool(ChainInfo{})
_, _ = node.isOutOfSyncWithPool()
})
})
t.Run("block height selection mode", func(t *testing.T) {
Expand Down Expand Up @@ -1653,7 +1663,11 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) {
for _, td := range []int64{totalDifficulty - syncThreshold - 1, totalDifficulty - syncThreshold, totalDifficulty, totalDifficulty + 1} {
for _, testCase := range testCases {
t.Run(fmt.Sprintf("%s: SelectionModeVal: %s: total difficulty: %d", testCase.name, selectionMode, td), func(t *testing.T) {
outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{BlockNumber: testCase.blockNumber, TotalDifficulty: big.NewInt(td)})
chainInfo := ChainInfo{BlockNumber: testCase.blockNumber, TotalDifficulty: big.NewInt(td)}
rpc := newMockRPCClient[types.ID, Head](t)
rpc.On("GetInterceptedChainInfo").Return(chainInfo, ChainInfo{}).Once()
node.rpc = rpc
outOfSync, liveNodes := node.isOutOfSyncWithPool()
assert.Equal(t, nodesNum, liveNodes)
assert.Equal(t, testCase.outOfSync, outOfSync)
})
Expand Down Expand Up @@ -1709,7 +1723,11 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) {
for _, hb := range []int64{highestBlock - syncThreshold - 1, highestBlock - syncThreshold, highestBlock, highestBlock + 1} {
for _, testCase := range testCases {
t.Run(fmt.Sprintf("%s: SelectionModeVal: %s: highest block: %d", testCase.name, NodeSelectionModeTotalDifficulty, hb), func(t *testing.T) {
outOfSync, liveNodes := node.isOutOfSyncWithPool(ChainInfo{BlockNumber: hb, TotalDifficulty: big.NewInt(testCase.totalDifficulty)})
chainInfo := ChainInfo{BlockNumber: hb, TotalDifficulty: big.NewInt(testCase.totalDifficulty)}
rpc := newMockRPCClient[types.ID, Head](t)
rpc.On("GetInterceptedChainInfo").Return(chainInfo, ChainInfo{}).Once()
node.rpc = rpc
outOfSync, liveNodes := node.isOutOfSyncWithPool()
assert.Equal(t, nodesNum, liveNodes)
assert.Equal(t, testCase.outOfSync, outOfSync)
})
Expand Down
Loading
Loading