diff --git a/.changeset/fast-dolphins-cry.md b/.changeset/fast-dolphins-cry.md new file mode 100644 index 00000000000..560313a8c56 --- /dev/null +++ b/.changeset/fast-dolphins-cry.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +handle connection timeout on cache path for ws client LatestReport #bugfix diff --git a/core/services/relay/evm/mercury/wsrpc/cache/cache.go b/core/services/relay/evm/mercury/wsrpc/cache/cache.go index adc439e802b..e2cf6a31a86 100644 --- a/core/services/relay/evm/mercury/wsrpc/cache/cache.go +++ b/core/services/relay/evm/mercury/wsrpc/cache/cache.go @@ -50,9 +50,8 @@ type Fetcher interface { } type Client interface { - Fetcher ServerURL() string - RawClient() pb.MercuryClient + RawLatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) } // Cache is scoped to one particular mercury server @@ -194,7 +193,7 @@ func (m *memCache) LatestReport(ctx context.Context, req *pb.LatestReportRequest } feedIDHex := mercuryutils.BytesToFeedID(req.FeedId).String() if m.cfg.LatestReportTTL <= 0 { - return m.client.RawClient().LatestReport(ctx, req) + return m.client.RawLatestReport(ctx, req) } vi, loaded := m.cache.LoadOrStore(feedIDHex, &cacheVal{ sync.RWMutex{}, @@ -311,7 +310,7 @@ func (m *memCache) fetch(req *pb.LatestReportRequest, v *cacheVal) { // NOTE: must drop down to RawClient here otherwise we enter an // infinite loop of calling a client that calls back to this same cache // and on and on - val, err = m.client.RawClient().LatestReport(ctx, req) + val, err = m.client.RawLatestReport(ctx, req) cancel() v.setError(err) if memcacheCtx.Err() != nil { diff --git a/core/services/relay/evm/mercury/wsrpc/cache/helpers_test.go b/core/services/relay/evm/mercury/wsrpc/cache/helpers_test.go index 4cc08bdd52e..f0211269a2f 100644 --- a/core/services/relay/evm/mercury/wsrpc/cache/helpers_test.go +++ b/core/services/relay/evm/mercury/wsrpc/cache/helpers_test.go @@ -13,7 +13,7 @@ type mockClient struct { err error } -func (m *mockClient) LatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) { +func (m *mockClient) RawLatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) { return m.resp, m.err } diff --git a/core/services/relay/evm/mercury/wsrpc/client.go b/core/services/relay/evm/mercury/wsrpc/client.go index bb1b886ed97..9cc9e23c67e 100644 --- a/core/services/relay/evm/mercury/wsrpc/client.go +++ b/core/services/relay/evm/mercury/wsrpc/client.go @@ -302,6 +302,13 @@ func (w *client) handleTimeout(err error) { } else { w.consecutiveTimeoutCnt.Store(0) } + +} + +func (w *client) RawLatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) { + resp, err = w.rawClient.LatestReport(ctx, req) + w.handleTimeout(err) + return } func (w *client) LatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) { @@ -312,8 +319,7 @@ func (w *client) LatestReport(ctx context.Context, req *pb.LatestReportRequest) } var cached bool if w.cache == nil { - resp, err = w.rawClient.LatestReport(ctx, req) - w.handleTimeout(err) + resp, err = w.RawLatestReport(ctx, req) } else { cached = true resp, err = w.cache.LatestReport(ctx, req) diff --git a/core/services/relay/evm/mercury/wsrpc/client_test.go b/core/services/relay/evm/mercury/wsrpc/client_test.go index b4a3dae733d..81cbe52e857 100644 --- a/core/services/relay/evm/mercury/wsrpc/client_test.go +++ b/core/services/relay/evm/mercury/wsrpc/client_test.go @@ -176,3 +176,65 @@ func Test_Client_LatestReport(t *testing.T) { }) } } + +func Test_Client_RawLatestReport(t *testing.T) { + lggr := logger.TestLogger(t) + ctx := testutils.Context(t) + + t.Run("sends on reset channel after MaxConsecutiveRequestFailures timed out transmits", func(t *testing.T) { + noopCacheSet := newNoopCacheSet() + req := &pb.LatestReportRequest{} + calls := 0 + timeoutErr := context.DeadlineExceeded + wsrpcClient := &mocks.MockWSRPCClient{ + LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (*pb.LatestReportResponse, error) { + calls++ + return nil, timeoutErr + }, + } + conn := &mocks.MockConn{ + Ready: true, + } + + c := newClient(lggr, csakey.KeyV2{}, nil, "", noopCacheSet) + c.conn = conn + c.rawClient = wsrpcClient + require.NoError(t, c.StartOnce("Mock WSRPC Client", func() error { return nil })) + for i := 1; i < MaxConsecutiveRequestFailures; i++ { + _, err := c.RawLatestReport(ctx, req) + require.EqualError(t, err, "context deadline exceeded") + } + assert.Equal(t, MaxConsecutiveRequestFailures-1, calls) + select { + case <-c.chResetTransport: + t.Fatal("unexpected send on chResetTransport") + default: + } + _, err := c.RawLatestReport(ctx, req) + require.EqualError(t, err, "context deadline exceeded") + assert.Equal(t, MaxConsecutiveRequestFailures, calls) + select { + case <-c.chResetTransport: + default: + t.Fatal("expected send on chResetTransport") + } + + t.Run("successful LatestReport resets the counter", func(t *testing.T) { + timeoutErr = nil + // working LatestReport to reset counter + _, err := c.RawLatestReport(ctx, req) + require.NoError(t, err) + assert.Equal(t, MaxConsecutiveRequestFailures+1, calls) + assert.Equal(t, 0, int(c.consecutiveTimeoutCnt.Load())) + }) + + t.Run("doesn't block in case channel is full", func(t *testing.T) { + timeoutErr = context.DeadlineExceeded + c.chResetTransport = nil // simulate full channel + for i := 0; i < MaxConsecutiveRequestFailures; i++ { + _, err := c.RawLatestReport(ctx, req) + require.EqualError(t, err, "context deadline exceeded") + } + }) + }) +}