diff --git a/common/client/models.go b/common/client/models.go index bd974f901fc..d0cf42a3844 100644 --- a/common/client/models.go +++ b/common/client/models.go @@ -18,8 +18,15 @@ const ( InsufficientFunds // Tx was rejected due to insufficient funds. ExceedsMaxFee // Attempt's fee was higher than the node's limit and got rejected. FeeOutOfValidRange // This error is returned when we use a fee price suggested from an RPC, but the network rejects the attempt due to an invalid range(mostly used by L2 chains). Retry by requesting a new suggested fee price. + sendTxReturnCodeLen // tracks the number of errors. Must always be last ) +// sendTxSevereErrors - error codes which signal that transaction would never be accepted in its current form by the node +var sendTxSevereErrors = []SendTxReturnCode{Fatal, Underpriced, Unsupported, ExceedsMaxFee, FeeOutOfValidRange, Unknown} + +// sendTxSuccessfulCodes - error codes which signal that transaction was accepted by the node +var sendTxSuccessfulCodes = []SendTxReturnCode{Successful, TransactionAlreadyKnown} + type NodeTier int const ( diff --git a/common/client/multi_node.go b/common/client/multi_node.go index 7d55784e68f..c03c3fbcd61 100644 --- a/common/client/multi_node.go +++ b/common/client/multi_node.go @@ -3,7 +3,9 @@ package client import ( "context" "fmt" + "math" "math/big" + "slices" "sync" "time" @@ -26,6 +28,11 @@ var ( Name: "multi_node_states", Help: "The number of RPC nodes currently in the given state for the given chain", }, []string{"network", "chainId", "state"}) + // PromMultiNodeInvariantViolations reports violation of our assumptions + PromMultiNodeInvariantViolations = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "multi_node_invariant_violations", + Help: "The number of invariant violations", + }, []string{"network", "chainId", "invariant"}) ErroringNodeError = fmt.Errorf("no live nodes available") ) @@ -94,6 +101,7 @@ type multiNode[ leaseTicker *time.Ticker chainFamily string reportInterval time.Duration + sendTxSoftTimeout time.Duration // defines max waiting time from first response til responses evaluation activeMu sync.RWMutex activeNode Node[CHAIN_ID, HEAD, RPC_CLIENT] @@ -101,7 +109,7 @@ type multiNode[ chStop services.StopChan wg sync.WaitGroup - sendOnlyErrorParser func(err error) SendTxReturnCode + classifySendTxError func(tx TX, err error) SendTxReturnCode } func NewMultiNode[ @@ -127,13 +135,16 @@ func NewMultiNode[ chainID CHAIN_ID, chainType config.ChainType, chainFamily string, - sendOnlyErrorParser func(err error) SendTxReturnCode, + classifySendTxError func(tx TX, err error) SendTxReturnCode, + sendTxSoftTimeout time.Duration, ) MultiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT] { nodeSelector := newNodeSelector(selectionMode, nodes) - // Prometheus' default interval is 15s, set this to under 7.5s to avoid // aliasing (see: https://en.wikipedia.org/wiki/Nyquist_frequency) const reportInterval = 6500 * time.Millisecond + if sendTxSoftTimeout == 0 { + sendTxSoftTimeout = QueryTimeout / 2 + } c := &multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]{ nodes: nodes, sendonlys: sendonlys, @@ -146,8 +157,9 @@ func NewMultiNode[ chStop: make(services.StopChan), leaseDuration: leaseDuration, chainFamily: chainFamily, - sendOnlyErrorParser: sendOnlyErrorParser, + classifySendTxError: classifySendTxError, reportInterval: reportInterval, + sendTxSoftTimeout: sendTxSoftTimeout, } c.lggr.Debugf("The MultiNode is configured to use NodeSelectionMode: %s", selectionMode) @@ -545,45 +557,188 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP return n.RPC().SendEmptyTransaction(ctx, newTxAttempt, seq, gasLimit, fee, fromAddress) } -func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) SendTransaction(ctx context.Context, tx TX) error { - main, nodeError := c.selectNode() - var all []SendOnlyNode[CHAIN_ID, RPC_CLIENT] - for _, n := range c.nodes { - all = append(all, n) +type sendTxResult struct { + Err error + ResultCode SendTxReturnCode +} + +func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) broadcastTxAsync(ctx context.Context, + n SendOnlyNode[CHAIN_ID, RPC_CLIENT], tx TX) sendTxResult { + txErr := n.RPC().SendTransaction(ctx, tx) + c.lggr.Debugw("Node sent transaction", "name", n.String(), "tx", tx, "err", txErr) + resultCode := c.classifySendTxError(tx, txErr) + if !slices.Contains(sendTxSuccessfulCodes, resultCode) { + c.lggr.Warnw("RPC returned error", "name", n.String(), "tx", tx, "err", txErr) } - all = append(all, c.sendonlys...) - for _, n := range all { - if n == main { - // main node is used at the end for the return value - continue + + return sendTxResult{Err: txErr, ResultCode: resultCode} +} + +// collectTxResults - refer to SendTransaction comment for implementation details, +func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) collectTxResults(ctx context.Context, tx TX, txResults <-chan sendTxResult) error { + // combine context and stop channel to ensure we stop, when signal received + ctx, cancel := c.chStop.Ctx(ctx) + defer cancel() + requiredResults := int(math.Ceil(float64(len(c.nodes)) * sendTxQuorum)) + errorsByCode := map[SendTxReturnCode][]error{} + var softTimeoutChan <-chan time.Time + var resultsCount int +loop: + for { + select { + case <-ctx.Done(): + c.lggr.Debugw("Failed to collect of the results before context was done", "tx", tx, "errorsByCode", errorsByCode) + return ctx.Err() + case result := <-txResults: + errorsByCode[result.ResultCode] = append(errorsByCode[result.ResultCode], result.Err) + resultsCount++ + if slices.Contains(sendTxSuccessfulCodes, result.ResultCode) || resultsCount >= requiredResults { + break loop + } + case <-softTimeoutChan: + c.lggr.Debugw("Send Tx soft timeout expired - returning responses we've collected so far", "tx", tx, "resultsCount", resultsCount, "requiredResults", requiredResults) + break loop } - // Parallel send to all other nodes with ignored return value - // Async - we do not want to block the main thread with secondary nodes - // in case they are unreliable/slow. - // It is purely a "best effort" send. - // Resource is not unbounded because the default context has a timeout. - ok := c.IfNotStopped(func() { - // Must wrap inside IfNotStopped to avoid waitgroup racing with Close - c.wg.Add(1) + + if softTimeoutChan == nil { + tm := time.NewTimer(c.sendTxSoftTimeout) + softTimeoutChan = tm.C + // we are fine with stopping timer at the end of function + //nolint + defer tm.Stop() + } + } + + // ignore critical error as it's reported in reportSendTxAnomalies + result, _ := aggregateTxResults(errorsByCode) + return result + +} + +func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) reportSendTxAnomalies(tx TX, txResults <-chan sendTxResult) { + defer c.wg.Done() + resultsByCode := map[SendTxReturnCode][]error{} + // txResults eventually will be closed + for txResult := range txResults { + resultsByCode[txResult.ResultCode] = append(resultsByCode[txResult.ResultCode], txResult.Err) + } + + _, criticalErr := aggregateTxResults(resultsByCode) + if criticalErr != nil { + c.lggr.Criticalw("observed invariant violation on SendTransaction", "tx", tx, "resultsByCode", resultsByCode, "err", criticalErr) + c.SvcErrBuffer.Append(criticalErr) + PromMultiNodeInvariantViolations.WithLabelValues(c.chainFamily, c.chainID.String(), criticalErr.Error()).Inc() + } +} + +func aggregateTxResults(resultsByCode map[SendTxReturnCode][]error) (txResult error, err error) { + severeErrors, hasSevereErrors := findFirstIn(resultsByCode, sendTxSevereErrors) + successResults, hasSuccess := findFirstIn(resultsByCode, sendTxSuccessfulCodes) + if hasSuccess { + // We assume that primary node would never report false positive txResult for a transaction. + // Thus, if such case occurs it's probably due to misconfiguration or a bug and requires manual intervention. + if hasSevereErrors { + const errMsg = "found contradictions in nodes replies on SendTransaction: got success and severe error" + // return success, since at least 1 node has accepted our broadcasted Tx, and thus it can now be included onchain + return successResults[0], fmt.Errorf(errMsg) + } + + // other errors are temporary - we are safe to return success + return successResults[0], nil + } + + if hasSevereErrors { + return severeErrors[0], nil + } + + // return temporary error + for _, result := range resultsByCode { + return result[0], nil + } + + err = fmt.Errorf("expected at least one response on SendTransaction") + return err, err +} + +const sendTxQuorum = 0.7 + +// SendTransaction - broadcasts transaction to all the send-only and primary nodes regardless of their health. +// A returned nil or error does not guarantee that the transaction will or won't be included. Additional checks must be +// performed to determine the final state. +// +// Send-only nodes' results are ignored as they tend to return false-positive responses. Broadcast to them is necessary +// to speed up the propagation of TX in the network. +// +// Handling of primary nodes' results consists of collection and aggregation. +// In the collection step, we gather as many results as possible while minimizing waiting time. This operation succeeds +// on one of the following conditions: +// * Received at least one success +// * Received at least one result and `sendTxSoftTimeout` expired +// * Received results from the sufficient number of nodes defined by sendTxQuorum. +// The aggregation is based on the following conditions: +// * If there is at least one success - returns success +// * If there is at least one terminal error - returns terminal error +// * If there is both success and terminal error - returns success and reports invariant violation +// * Otherwise, returns any (effectively random) of the errors. +func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) SendTransaction(ctx context.Context, tx TX) error { + if len(c.nodes) == 0 { + return ErroringNodeError + } + + txResults := make(chan sendTxResult, len(c.nodes)) + // Must wrap inside IfNotStopped to avoid waitgroup racing with Close + ok := c.IfNotStopped(func() { + c.wg.Add(len(c.sendonlys)) + // fire-n-forget, as sendOnlyNodes can not be trusted with result reporting + for _, n := range c.sendonlys { go func(n SendOnlyNode[CHAIN_ID, RPC_CLIENT]) { defer c.wg.Done() + c.broadcastTxAsync(ctx, n, tx) + }(n) + } - txErr := n.RPC().SendTransaction(ctx, tx) - c.lggr.Debugw("Sendonly node sent transaction", "name", n.String(), "tx", tx, "err", txErr) - sendOnlyError := c.sendOnlyErrorParser(txErr) - if sendOnlyError != Successful { - c.lggr.Warnw("RPC returned error", "name", n.String(), "tx", tx, "err", txErr) - } + var primaryBroadcastWg sync.WaitGroup + primaryBroadcastWg.Add(len(c.nodes)) + txResultsToReport := make(chan sendTxResult, len(c.nodes)) + for _, n := range c.nodes { + go func(n SendOnlyNode[CHAIN_ID, RPC_CLIENT]) { + defer primaryBroadcastWg.Done() + result := c.broadcastTxAsync(ctx, n, tx) + // both channels are sufficiently buffered, so we won't be locked + txResultsToReport <- result + txResults <- result }(n) - }) - if !ok { - c.lggr.Debug("Cannot send transaction on sendonly node; MultiNode is stopped", "node", n.String()) } + + c.wg.Add(1) + go func() { + // wait for primary nodes to finish the broadcast before closing the channel + primaryBroadcastWg.Wait() + close(txResultsToReport) + close(txResults) + c.wg.Done() + }() + + c.wg.Add(1) + go c.reportSendTxAnomalies(tx, txResultsToReport) + + }) + if !ok { + return fmt.Errorf("aborted while broadcasting tx - multiNode is stopped: %w", context.Canceled) } - if nodeError != nil { - return nodeError + + return c.collectTxResults(ctx, tx, txResults) +} + +// findFirstIn - returns first existing value for the slice of keys +func findFirstIn[K comparable, V any](set map[K]V, keys []K) (V, bool) { + for _, k := range keys { + if v, ok := set[k]; ok { + return v, true + } } - return main.RPC().SendTransaction(ctx, tx) + var v V + return v, false } func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) SequenceAt(ctx context.Context, account ADDR, blockNumber *big.Int) (s SEQ, err error) { diff --git a/common/client/multi_node_test.go b/common/client/multi_node_test.go index 82af7411080..3ffc82572c2 100644 --- a/common/client/multi_node_test.go +++ b/common/client/multi_node_test.go @@ -1,9 +1,10 @@ package client import ( + "context" "errors" "fmt" - big "math/big" + "math/big" "math/rand" "testing" "time" @@ -38,7 +39,8 @@ type multiNodeOpts struct { chainID types.ID chainType config.ChainType chainFamily string - sendOnlyErrorParser func(err error) SendTxReturnCode + classifySendTxError func(tx any, err error) SendTxReturnCode + sendTxSoftTimeout time.Duration } func newTestMultiNode(t *testing.T, opts multiNodeOpts) testMultiNode { @@ -49,7 +51,7 @@ func newTestMultiNode(t *testing.T, opts multiNodeOpts) testMultiNode { result := NewMultiNode[types.ID, *big.Int, Hashable, Hashable, any, Hashable, any, any, types.Receipt[Hashable, Hashable], Hashable, types.Head[Hashable], multiNodeRPCClient](opts.logger, opts.selectionMode, opts.leaseDuration, opts.noNewHeadsThreshold, opts.nodes, opts.sendonlys, - opts.chainID, opts.chainType, opts.chainFamily, opts.sendOnlyErrorParser) + opts.chainID, opts.chainType, opts.chainFamily, opts.classifySendTxError, opts.sendTxSoftTimeout) return testMultiNode{ result.(*multiNode[types.ID, *big.Int, Hashable, Hashable, any, Hashable, any, any, types.Receipt[Hashable, Hashable], Hashable, types.Head[Hashable], multiNodeRPCClient]), @@ -559,77 +561,252 @@ func TestMultiNode_BatchCallContextAll(t *testing.T) { func TestMultiNode_SendTransaction(t *testing.T) { t.Parallel() - t.Run("Fails if failed to select active node", func(t *testing.T) { - chainID := types.RandomID() - mn := newTestMultiNode(t, multiNodeOpts{ + classifySendTxError := func(tx any, err error) SendTxReturnCode { + if err != nil { + return Fatal + } + + return Successful + } + newNode := func(t *testing.T, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, types.Head[Hashable], multiNodeRPCClient] { + rpc := newMultiNodeRPCClient(t) + rpc.On("SendTransaction", mock.Anything, mock.Anything).Return(txErr).Run(sendTxRun).Maybe() + node := newMockNode[types.ID, types.Head[Hashable], multiNodeRPCClient](t) + node.On("String").Return("node name").Maybe() + node.On("RPC").Return(rpc).Maybe() + node.On("Close").Return(nil).Once() + return node + } + newStartedMultiNode := func(t *testing.T, opts multiNodeOpts) testMultiNode { + mn := newTestMultiNode(t, opts) + err := mn.StartOnce("startedTestMultiNode", func() error { return nil }) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, mn.Close()) + }) + return mn + } + t.Run("Fails if there is no nodes available", func(t *testing.T) { + mn := newStartedMultiNode(t, multiNodeOpts{ selectionMode: NodeSelectionModeRoundRobin, - chainID: chainID, + chainID: types.RandomID(), }) - nodeSelector := newMockNodeSelector[types.ID, types.Head[Hashable], multiNodeRPCClient](t) - nodeSelector.On("Select").Return(nil).Once() - nodeSelector.On("Name").Return("MockedNodeSelector").Once() - mn.nodeSelector = nodeSelector err := mn.SendTransaction(tests.Context(t), nil) - require.EqualError(t, err, ErroringNodeError.Error()) + assert.EqualError(t, err, ErroringNodeError.Error()) }) - t.Run("Returns error if RPC call fails for active node", func(t *testing.T) { + t.Run("Transaction failure happy path", func(t *testing.T) { chainID := types.RandomID() - rpc := newMultiNodeRPCClient(t) - expectedError := errors.New("rpc failed to do the batch call") - rpc.On("SendTransaction", mock.Anything, mock.Anything).Return(expectedError).Once() - node := newMockNode[types.ID, types.Head[Hashable], multiNodeRPCClient](t) - node.On("RPC").Return(rpc) - nodeSelector := newMockNodeSelector[types.ID, types.Head[Hashable], multiNodeRPCClient](t) - nodeSelector.On("Select").Return(node).Once() - mn := newTestMultiNode(t, multiNodeOpts{ - selectionMode: NodeSelectionModeRoundRobin, - chainID: chainID, + expectedError := errors.New("transaction failed") + mainNode := newNode(t, expectedError, nil) + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + mn := newStartedMultiNode(t, multiNodeOpts{ + selectionMode: NodeSelectionModeRoundRobin, + chainID: chainID, + nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{mainNode}, + sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{newNode(t, errors.New("unexpected error"), nil)}, + classifySendTxError: classifySendTxError, + logger: lggr, }) - mn.nodeSelector = nodeSelector err := mn.SendTransaction(tests.Context(t), nil) require.EqualError(t, err, expectedError.Error()) + tests.AssertLogCountEventually(t, observedLogs, "Node sent transaction", 2) + tests.AssertLogCountEventually(t, observedLogs, "RPC returned error", 2) }) - t.Run("Returns result of main node and logs secondary nodes results", func(t *testing.T) { - // setup RPCs - failedRPC := newMultiNodeRPCClient(t) - failedRPC.On("SendTransaction", mock.Anything, mock.Anything). - Return(errors.New("rpc failed to do the batch call")).Once() - okRPC := newMultiNodeRPCClient(t) - okRPC.On("SendTransaction", mock.Anything, mock.Anything).Return(nil).Twice() - - // setup ok and failed auxiliary nodes - okNode := newMockSendOnlyNode[types.ID, multiNodeRPCClient](t) - okNode.On("RPC").Return(okRPC).Once() - okNode.On("String").Return("okNode") - failedNode := newMockNode[types.ID, types.Head[Hashable], multiNodeRPCClient](t) - failedNode.On("RPC").Return(failedRPC).Once() - failedNode.On("String").Return("failedNode") - - // setup main node - mainNode := newMockNode[types.ID, types.Head[Hashable], multiNodeRPCClient](t) - mainNode.On("RPC").Return(okRPC) - nodeSelector := newMockNodeSelector[types.ID, types.Head[Hashable], multiNodeRPCClient](t) - nodeSelector.On("Select").Return(mainNode).Once() + t.Run("Transaction success happy path", func(t *testing.T) { + chainID := types.RandomID() + mainNode := newNode(t, nil, nil) lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + mn := newStartedMultiNode(t, multiNodeOpts{ + selectionMode: NodeSelectionModeRoundRobin, + chainID: chainID, + nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{mainNode}, + sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{newNode(t, errors.New("unexpected error"), nil)}, + classifySendTxError: classifySendTxError, + logger: lggr, + }) + err := mn.SendTransaction(tests.Context(t), nil) + require.NoError(t, err) + tests.AssertLogCountEventually(t, observedLogs, "Node sent transaction", 2) + tests.AssertLogCountEventually(t, observedLogs, "RPC returned error", 1) + }) + t.Run("Context expired before collecting sufficient results", func(t *testing.T) { + chainID := types.RandomID() + testContext, testCancel := context.WithCancel(tests.Context(t)) + defer testCancel() + mainNode := newNode(t, errors.New("unexpected error"), func(_ mock.Arguments) { + // block caller til end of the test + <-testContext.Done() + }) + mn := newStartedMultiNode(t, multiNodeOpts{ + selectionMode: NodeSelectionModeRoundRobin, + chainID: chainID, + nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{mainNode}, + classifySendTxError: classifySendTxError, + }) + requestContext, cancel := context.WithCancel(tests.Context(t)) + cancel() + err := mn.SendTransaction(requestContext, nil) + require.EqualError(t, err, "context canceled") + }) + t.Run("Soft timeout stops results collection", func(t *testing.T) { + chainID := types.RandomID() + expectedError := errors.New("tmp error") + fastNode := newNode(t, expectedError, nil) + // hold reply from the node till end of the test + testContext, testCancel := context.WithCancel(tests.Context(t)) + defer testCancel() + slowNode := newNode(t, errors.New("transaction failed"), func(_ mock.Arguments) { + // block caller til end of the test + <-testContext.Done() + }) + mn := newStartedMultiNode(t, multiNodeOpts{ + selectionMode: NodeSelectionModeRoundRobin, + chainID: chainID, + nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{fastNode, slowNode}, + classifySendTxError: classifySendTxError, + sendTxSoftTimeout: tests.TestInterval, + }) + err := mn.SendTransaction(tests.Context(t), nil) + require.EqualError(t, err, expectedError.Error()) + }) + t.Run("Returns success without waiting for the rest of the nodes", func(t *testing.T) { + chainID := types.RandomID() + fastNode := newNode(t, nil, nil) + // hold reply from the node till end of the test + testContext, testCancel := context.WithCancel(tests.Context(t)) + defer testCancel() + slowNode := newNode(t, errors.New("transaction failed"), func(_ mock.Arguments) { + // block caller til end of the test + <-testContext.Done() + }) + slowSendOnly := newNode(t, errors.New("send only failed"), func(_ mock.Arguments) { + // block caller til end of the test + <-testContext.Done() + }) + lggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) mn := newTestMultiNode(t, multiNodeOpts{ - selectionMode: NodeSelectionModeRoundRobin, - chainID: types.RandomID(), - nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{failedNode, mainNode}, - sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{okNode}, - logger: lggr, - sendOnlyErrorParser: func(err error) SendTxReturnCode { - if err != nil { - return Fatal - } - - return Successful - }, + logger: lggr, + selectionMode: NodeSelectionModeRoundRobin, + chainID: chainID, + nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{fastNode, slowNode}, + sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{slowSendOnly}, + classifySendTxError: classifySendTxError, + sendTxSoftTimeout: tests.TestInterval, }) - mn.nodeSelector = nodeSelector - + assert.NoError(t, mn.StartOnce("startedTestMultiNode", func() error { return nil })) err := mn.SendTransaction(tests.Context(t), nil) require.NoError(t, err) - tests.AssertLogEventually(t, observedLogs, "Sendonly node sent transaction") - tests.AssertLogEventually(t, observedLogs, "RPC returned error") + testCancel() + require.NoError(t, mn.Close()) + tests.AssertLogEventually(t, observedLogs, "observed invariant violation on SendTransaction") + }) + t.Run("Fails when closed", func(t *testing.T) { + mn := newTestMultiNode(t, multiNodeOpts{ + selectionMode: NodeSelectionModeRoundRobin, + chainID: types.RandomID(), + nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{newNode(t, nil, nil)}, + sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{newNode(t, nil, nil)}, + classifySendTxError: classifySendTxError, + }) + err := mn.StartOnce("startedTestMultiNode", func() error { return nil }) + require.NoError(t, err) + require.NoError(t, mn.Close()) + err = mn.SendTransaction(tests.Context(t), nil) + require.EqualError(t, err, "aborted while broadcasting tx - multiNode is stopped: context canceled") }) } + +func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { + t.Parallel() + // ensure failure on new SendTxReturnCode + codesToCover := map[SendTxReturnCode]struct{}{} + for code := Successful; code < sendTxReturnCodeLen; code++ { + codesToCover[code] = struct{}{} + } + + testCases := []struct { + Name string + ExpectedTxResult string + ExpectedCriticalErr string + ResultsByCode map[SendTxReturnCode][]error + }{ + { + Name: "Returns success and logs critical error on success and Fatal", + ExpectedTxResult: "success", + ExpectedCriticalErr: "found contradictions in nodes replies on SendTransaction: got success and severe error", + ResultsByCode: map[SendTxReturnCode][]error{ + Successful: {errors.New("success")}, + Fatal: {errors.New("fatal")}, + }, + }, + { + Name: "Returns TransactionAlreadyKnown and logs critical error on TransactionAlreadyKnown and Fatal", + ExpectedTxResult: "tx_already_known", + ExpectedCriticalErr: "found contradictions in nodes replies on SendTransaction: got success and severe error", + ResultsByCode: map[SendTxReturnCode][]error{ + TransactionAlreadyKnown: {errors.New("tx_already_known")}, + Unsupported: {errors.New("unsupported")}, + }, + }, + { + Name: "Prefers sever error to temporary", + ExpectedTxResult: "underpriced", + ExpectedCriticalErr: "", + ResultsByCode: map[SendTxReturnCode][]error{ + Retryable: {errors.New("retryable")}, + Underpriced: {errors.New("underpriced")}, + }, + }, + { + Name: "Returns temporary error", + ExpectedTxResult: "retryable", + ExpectedCriticalErr: "", + ResultsByCode: map[SendTxReturnCode][]error{ + Retryable: {errors.New("retryable")}, + }, + }, + { + Name: "Insufficient funds is treated as error", + ExpectedTxResult: "", + ExpectedCriticalErr: "", + ResultsByCode: map[SendTxReturnCode][]error{ + Successful: {nil}, + InsufficientFunds: {errors.New("insufficientFunds")}, + }, + }, + { + Name: "Logs critical error on empty ResultsByCode", + ExpectedTxResult: "expected at least one response on SendTransaction", + ExpectedCriticalErr: "expected at least one response on SendTransaction", + ResultsByCode: map[SendTxReturnCode][]error{}, + }, + } + + for _, testCase := range testCases { + for code := range testCase.ResultsByCode { + delete(codesToCover, code) + } + t.Run(testCase.Name, func(t *testing.T) { + txResult, err := aggregateTxResults(testCase.ResultsByCode) + if testCase.ExpectedTxResult == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, txResult, testCase.ExpectedTxResult) + } + + if testCase.ExpectedCriticalErr == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, testCase.ExpectedCriticalErr) + } + }) + } + + // explicitly signal that following codes are properly handled in aggregateTxResults, + //but dedicated test cases won't be beneficial + for _, codeToIgnore := range []SendTxReturnCode{Unknown, ExceedsMaxFee, FeeOutOfValidRange} { + delete(codesToCover, codeToIgnore) + } + assert.Empty(t, codesToCover, "all of the SendTxReturnCode must be covered by this test") + +} diff --git a/core/chains/evm/client/chain_client.go b/core/chains/evm/client/chain_client.go index 5dd70992382..2a5a37da47c 100644 --- a/core/chains/evm/client/chain_client.go +++ b/core/chains/evm/client/chain_client.go @@ -73,7 +73,10 @@ func NewChainClient( chainID, chainType, "EVM", - ClassifySendOnlyError, + func(tx *types.Transaction, err error) commonclient.SendTxReturnCode { + return ClassifySendError(err, logger.Sugared(logger.Nop()), tx, common.Address{}, chainType.IsL2()) + }, + 0, // use the default value provided by the implementation ) return &chainClient{ multiNode: multiNode, diff --git a/core/chains/evm/client/errors.go b/core/chains/evm/client/errors.go index bb748cb52fb..67197b764a0 100644 --- a/core/chains/evm/client/errors.go +++ b/core/chains/evm/client/errors.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/smartcontractkit/chainlink-common/pkg/logger" + commonclient "github.com/smartcontractkit/chainlink/v2/common/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/label" ) @@ -311,6 +312,17 @@ func (s *SendError) IsTimeout() bool { return errors.Is(s.err, context.DeadlineExceeded) } +// IsCanceled indicates if the error was caused by an context cancellation +func (s *SendError) IsCanceled() bool { + if s == nil { + return false + } + if s.err == nil { + return false + } + return errors.Is(s.err, context.Canceled) +} + func NewFatalSendError(e error) *SendError { if e == nil { return nil @@ -475,6 +487,10 @@ func ClassifySendError(err error, lggr logger.SugaredLogger, tx *types.Transacti lggr.Errorw("timeout while sending transaction %x", tx.Hash(), "err", sendError, "etx", tx) return commonclient.Retryable } + if sendError.IsCanceled() { + lggr.Errorw("context was canceled while sending transaction %x", tx.Hash(), "err", sendError, "etx", tx) + return commonclient.Retryable + } if sendError.IsTxFeeExceedsCap() { lggr.Criticalw(fmt.Sprintf("Sending transaction failed: %s", label.RPCTxFeeCapConfiguredIncorrectlyWarning), "etx", tx, @@ -486,15 +502,3 @@ func ClassifySendError(err error, lggr logger.SugaredLogger, tx *types.Transacti lggr.Errorw("Unknown error encountered when sending transaction", "err", err, "etx", tx) return commonclient.Unknown } - -// ClassifySendOnlyError handles SendOnly nodes error codes. In that case, we don't assume there is another transaction that will be correctly -// priced. -func ClassifySendOnlyError(err error) commonclient.SendTxReturnCode { - sendError := NewSendError(err) - if sendError == nil || sendError.IsNonceTooLowError() || sendError.IsTransactionAlreadyMined() || sendError.IsTransactionAlreadyInMempool() { - // Nonce too low or transaction known errors are expected since - // the primary SendTransaction may well have succeeded already - return commonclient.Successful - } - return commonclient.Fatal -}