From c6922dac1b0693831c69fdc886418abc915ff4c0 Mon Sep 17 00:00:00 2001 From: aramalipoor Date: Fri, 30 Aug 2024 01:06:18 +0200 Subject: [PATCH] fix: remove upstream read lock during forward loop --- common/errors.go | 29 ++++++++++-------------- erpc/http_server_test.go | 4 ++-- erpc/networks.go | 4 +--- erpc/networks_test.go | 30 ++++++++++++++----------- upstream/failsafe.go | 38 ++++++++++++++++++++++++-------- upstream/http_json_rpc_client.go | 13 ----------- upstream/registry.go | 10 ++++----- upstream/registry_test.go | 2 +- upstream/upstream.go | 13 ++--------- 9 files changed, 69 insertions(+), 74 deletions(-) diff --git a/common/errors.go b/common/errors.go index 724fcfbc..56440279 100644 --- a/common/errors.go +++ b/common/errors.go @@ -303,7 +303,7 @@ var NewErrRequestTimeout = func(timeout time.Duration) error { } func (e *ErrRequestTimeout) ErrorStatusCode() int { - return http.StatusRequestTimeout + return http.StatusGatewayTimeout } type ErrInternalServerError struct{ BaseError } @@ -512,7 +512,7 @@ var NewErrUpstreamClientInitialization = func(cause error, upstreamId string) er type ErrUpstreamRequest struct{ BaseError } -var NewErrUpstreamRequest = func(cause error, prjId, netId, upsId, method string, duration time.Duration, attempts, retries, hedges int) error { +var NewErrUpstreamRequest = func(cause error, upsId string, duration time.Duration, attempts, retries, hedges int) error { return &ErrUpstreamRequest{ BaseError{ Code: "ErrUpstreamRequest", @@ -520,10 +520,7 @@ var NewErrUpstreamRequest = func(cause error, prjId, netId, upsId, method string Cause: cause, Details: map[string]interface{}{ "durationMs": duration.Milliseconds(), - "projectId": prjId, - "networkId": netId, "upstreamId": upsId, - "method": method, "attempts": attempts, "retries": retries, "hedges": hedges, @@ -687,7 +684,7 @@ func (e *ErrUpstreamsExhausted) SummarizeCauses() string { timeout := 0 serverError := 0 rateLimit := 0 - down := 0 + cbOpen := 0 billing := 0 other := 0 cancelled := 0 @@ -706,7 +703,7 @@ func (e *ErrUpstreamsExhausted) SummarizeCauses() string { billing++ continue } else if HasErrorCode(e, ErrCodeFailsafeCircuitBreakerOpen) { - down++ + cbOpen++ continue } else if errors.Is(e, context.DeadlineExceeded) || HasErrorCode(e, ErrCodeEndpointRequestTimeout) { timeout++ @@ -738,20 +735,20 @@ func (e *ErrUpstreamsExhausted) SummarizeCauses() string { if rateLimit > 0 { reasons = append(reasons, fmt.Sprintf("%d rate limited", rateLimit)) } - if down > 0 { - reasons = append(reasons, fmt.Sprintf("%d down", down)) + if cbOpen > 0 { + reasons = append(reasons, fmt.Sprintf("%d circuit breaker open", cbOpen)) } if billing > 0 { reasons = append(reasons, fmt.Sprintf("%d billing issues", billing)) } - if other > 0 { - reasons = append(reasons, fmt.Sprintf("%d other errors", other)) - } if cancelled > 0 { reasons = append(reasons, fmt.Sprintf("%d hedges cancelled", cancelled)) } + if other > 0 { + reasons = append(reasons, fmt.Sprintf("%d other errors", other)) + } - return strings.Join(reasons, " + ") + return strings.Join(reasons, ", ") } return "" @@ -886,8 +883,7 @@ type ErrUpstreamRequestSkipped struct{ BaseError } const ErrCodeUpstreamRequestSkipped ErrorCode = "ErrUpstreamRequestSkipped" -var NewErrUpstreamRequestSkipped = func(reason error, upstreamId string, req *NormalizedRequest) error { - m, _ := req.Method() +var NewErrUpstreamRequestSkipped = func(reason error, upstreamId string) error { return &ErrUpstreamRequestSkipped{ BaseError{ Code: ErrCodeUpstreamRequestSkipped, @@ -895,7 +891,6 @@ var NewErrUpstreamRequestSkipped = func(reason error, upstreamId string, req *No Cause: reason, Details: map[string]interface{}{ "upstreamId": upstreamId, - "method": m, }, }, } @@ -1186,7 +1181,7 @@ var NewErrNetworkRequestTimeout = func(duration time.Duration) error { } func (e *ErrNetworkRequestTimeout) ErrorStatusCode() int { - return http.StatusRequestTimeout + return http.StatusGatewayTimeout } type ErrUpstreamRateLimitRuleExceeded struct{ BaseError } diff --git a/erpc/http_server_test.go b/erpc/http_server_test.go index eb06ba18..7a21fef6 100644 --- a/erpc/http_server_test.go +++ b/erpc/http_server_test.go @@ -139,7 +139,7 @@ func TestHttpServer_RaceTimeouts(t *testing.T) { wg.Wait() for _, result := range results { - if result.statusCode != http.StatusGatewayTimeout && result.statusCode != http.StatusRequestTimeout { + if result.statusCode != http.StatusGatewayTimeout { t.Errorf("unexpected status code: %d", result.statusCode) } assert.Contains(t, result.body, "Timeout") @@ -210,7 +210,7 @@ func TestHttpServer_RaceTimeouts(t *testing.T) { timeouts := 0 successes := 0 for _, result := range results { - if result.statusCode == http.StatusGatewayTimeout || result.statusCode == http.StatusRequestTimeout { + if result.statusCode == http.StatusGatewayTimeout { timeouts++ assert.Contains(t, result.body, "Timeout") } else { diff --git a/erpc/networks.go b/erpc/networks.go index bf9c5412..e4c23405 100644 --- a/erpc/networks.go +++ b/erpc/networks.go @@ -162,8 +162,6 @@ func (n *Network) Forward(ctx context.Context, req *common.NormalizedRequest) (* } upsList, err := n.upstreamsRegistry.GetSortedUpstreams(n.NetworkId, method) - n.upstreamsRegistry.RLockUpstreams() - defer n.upstreamsRegistry.RUnlockUpstreams() if err != nil { inf.Close(nil, err) return nil, err @@ -257,7 +255,7 @@ func (n *Network) Forward(ctx context.Context, req *common.NormalizedRequest) (* }) if execErr != nil { - err := upstream.TranslateFailsafeError(execErr) + err := upstream.TranslateFailsafeError("", method, execErr) // If error is due to empty response be generous and accept it, // because this means after many retries still no data is available. if common.HasErrorCode(err, common.ErrCodeFailsafeRetryExceeded) { diff --git a/erpc/networks_test.go b/erpc/networks_test.go index d53e3f35..9034ea8f 100644 --- a/erpc/networks_test.go +++ b/erpc/networks_test.go @@ -3872,7 +3872,7 @@ func TestNetwork_Forward(t *testing.T) { rateLimitersRegistry, ) - network, err := networksRegistry.RegisterNetwork( + _, err = networksRegistry.RegisterNetwork( &logger, &common.ProjectConfig{Id: projectID}, &common.NetworkConfig{ @@ -3882,7 +3882,7 @@ func TestNetwork_Forward(t *testing.T) { ) assert.NoError(t, err) - simulateRequests := func(method string, upstreamId string, latency time.Duration) { + mockRequests := func(method string, upstreamId string, latency time.Duration) { gock.New("http://" + upstreamId + ".localhost"). Persist(). Post("/"). @@ -3897,15 +3897,15 @@ func TestNetwork_Forward(t *testing.T) { } // Upstream A is faster for eth_call, Upstream B is faster for eth_traceTransaction, Upstream C is faster for eth_getLogs - simulateRequests("eth_getLogs", "upstream-a", 200*time.Millisecond) - simulateRequests("eth_getLogs", "upstream-b", 100*time.Millisecond) - simulateRequests("eth_getLogs", "upstream-c", 50*time.Millisecond) - simulateRequests("eth_traceTransaction", "upstream-a", 100*time.Millisecond) - simulateRequests("eth_traceTransaction", "upstream-b", 50*time.Millisecond) - simulateRequests("eth_traceTransaction", "upstream-c", 200*time.Millisecond) - simulateRequests("eth_call", "upstream-a", 50*time.Millisecond) - simulateRequests("eth_call", "upstream-b", 200*time.Millisecond) - simulateRequests("eth_call", "upstream-c", 100*time.Millisecond) + mockRequests("eth_getLogs", "upstream-a", 200*time.Millisecond) + mockRequests("eth_getLogs", "upstream-b", 100*time.Millisecond) + mockRequests("eth_getLogs", "upstream-c", 50*time.Millisecond) + mockRequests("eth_traceTransaction", "upstream-a", 100*time.Millisecond) + mockRequests("eth_traceTransaction", "upstream-b", 50*time.Millisecond) + mockRequests("eth_traceTransaction", "upstream-c", 200*time.Millisecond) + mockRequests("eth_call", "upstream-a", 50*time.Millisecond) + mockRequests("eth_call", "upstream-b", 200*time.Millisecond) + mockRequests("eth_call", "upstream-c", 100*time.Millisecond) allMethods := []string{"eth_getLogs", "eth_traceTransaction", "eth_call"} @@ -3917,14 +3917,18 @@ func TestNetwork_Forward(t *testing.T) { wg := sync.WaitGroup{} for _, method := range allMethods { - for i := 0; i < 500; i++ { + for i := 0; i < 1; i++ { wg.Add(1) go func(method string) { defer wg.Done() upstreamsRegistry.RefreshUpstreamNetworkMethodScores() req := common.NewNormalizedRequest([]byte(fmt.Sprintf(`{"jsonrpc":"2.0","method":"%s","params":[],"id":1}`, method))) - _, err := network.Forward(ctx, req) + ups, err := upstreamsRegistry.GetSortedUpstreams(networkID, method) assert.NoError(t, err) + for _, up := range ups { + _, err = up.Forward(ctx, req) + assert.NoError(t, err) + } }(method) time.Sleep(1 * time.Millisecond) } diff --git a/upstream/failsafe.go b/upstream/failsafe.go index eae59c50..bde81d38 100644 --- a/upstream/failsafe.go +++ b/upstream/failsafe.go @@ -1,6 +1,7 @@ package upstream import ( + "context" "errors" "fmt" "time" @@ -323,7 +324,8 @@ func createTimeoutPolicy(component string, cfg *common.TimeoutPolicyConfig) (fai return builder.Build(), nil } -func TranslateFailsafeError(execErr error) error { +func TranslateFailsafeError(upstreamId, method string, execErr error) error { + var err error var retryExceededErr retrypolicy.ExceededError if errors.As(execErr, &retryExceededErr) { ler := retryExceededErr.LastError @@ -334,17 +336,35 @@ func TranslateFailsafeError(execErr error) error { } var translatedCause error if ler != nil { - translatedCause = TranslateFailsafeError(ler) + translatedCause = TranslateFailsafeError("", "", ler) } - return common.NewErrFailsafeRetryExceeded(translatedCause) + err = common.NewErrFailsafeRetryExceeded(translatedCause) + } else if errors.Is(execErr, timeout.ErrExceeded) || + errors.Is(execErr, context.DeadlineExceeded) { + err = common.NewErrFailsafeTimeoutExceeded(execErr) + } else if errors.Is(execErr, circuitbreaker.ErrOpen) { + err = common.NewErrFailsafeCircuitBreakerOpen(execErr) } - if errors.Is(execErr, timeout.ErrExceeded) { - return common.NewErrFailsafeTimeoutExceeded(execErr) - } - - if errors.Is(execErr, circuitbreaker.ErrOpen) { - return common.NewErrFailsafeCircuitBreakerOpen(execErr) + if err != nil { + if method != "" { + if ser, ok := execErr.(common.StandardError); ok { + be := ser.Base() + if be != nil { + if upstreamId != "" { + be.Details = map[string]interface{}{ + "upstreamId": upstreamId, + "method": method, + } + } else { + be.Details = map[string]interface{}{ + "method": method, + } + } + } + } + } + return err } return execErr diff --git a/upstream/http_json_rpc_client.go b/upstream/http_json_rpc_client.go index 4d4124a2..31c11b75 100644 --- a/upstream/http_json_rpc_client.go +++ b/upstream/http_json_rpc_client.go @@ -119,13 +119,9 @@ func (c *GenericHttpJsonRpcClient) SendRequest(ctx context.Context, req *common. startedAt := time.Now() jrReq, err := req.JsonRpcRequest() if err != nil { - m, _ := req.Method() return nil, common.NewErrUpstreamRequest( err, - c.upstream.ProjectId, - req.NetworkId(), c.upstream.Config().Id, - m, 0, 0, 0, 0, ) } @@ -213,13 +209,9 @@ func (c *GenericHttpJsonRpcClient) processBatch() { for _, req := range requests { jrReq, err := req.request.JsonRpcRequest() if err != nil { - m, _ := req.request.Method() req.err <- common.NewErrUpstreamRequest( err, - c.upstream.ProjectId, - req.request.NetworkId(), c.upstream.Config().Id, - m, 0, 0, 0, 0, ) continue @@ -369,13 +361,9 @@ func (c *GenericHttpJsonRpcClient) processBatchResponse(requests map[interface{} func (c *GenericHttpJsonRpcClient) sendSingleRequest(ctx context.Context, req *common.NormalizedRequest) (*common.NormalizedResponse, error) { jrReq, err := req.JsonRpcRequest() if err != nil { - m, _ := req.Method() return nil, common.NewErrUpstreamRequest( err, - c.upstream.ProjectId, - req.NetworkId(), c.upstream.Config().Id, - m, 0, 0, 0, 0, ) } @@ -536,7 +524,6 @@ func extractJsonRpcError(r *http.Response, nr *common.NormalizedResponse, jr *co ), ) } else if r.StatusCode == 429 || - r.StatusCode == 408 || code == -32005 || strings.Contains(err.Message, "has exceeded") || strings.Contains(err.Message, "Exceeded the quota") || diff --git a/upstream/registry.go b/upstream/registry.go index ca748334..eedc7fcc 100644 --- a/upstream/registry.go +++ b/upstream/registry.go @@ -34,8 +34,8 @@ type UpstreamsRegistry struct { } type UpstreamsHealth struct { - Upstreams []*Upstream `json:"upstreams"` - SortedUpstreams map[string]map[string][]string `json:"sortedUpstreams"` + Upstreams []*Upstream `json:"upstreams"` + SortedUpstreams map[string]map[string][]string `json:"sortedUpstreams"` UpstreamScores map[string]map[string]map[string]float64 `json:"upstreamScores"` } @@ -350,13 +350,13 @@ func (u *UpstreamsRegistry) calculateScore(normTotalRequests, normP90Latency, no score += expCurve(1 - normTotalRequests) // Higher score for lower p90 latency - score += expCurve(1 - normP90Latency) * 4 + score += expCurve(1-normP90Latency) * 4 // Higher score for lower error rate - score += expCurve(1 - normErrorRate) * 8 + score += expCurve(1-normErrorRate) * 8 // Higher score for lower throttled rate - score += expCurve(1 - normThrottledRate) * 3 + score += expCurve(1-normThrottledRate) * 3 return score } diff --git a/upstream/registry_test.go b/upstream/registry_test.go index 8bfd96f2..e2193ca8 100644 --- a/upstream/registry_test.go +++ b/upstream/registry_test.go @@ -217,7 +217,7 @@ func TestUpstreamScoring(t *testing.T) { name string windowSize time.Duration upstreamConfig []upstreamMetrics - expectedOrder []string + expectedOrder []string }{ { name: "MixedLatencyAndFailureRate", diff --git a/upstream/upstream.go b/upstream/upstream.go index aebcaa04..39e2a4b8 100644 --- a/upstream/upstream.go +++ b/upstream/upstream.go @@ -183,7 +183,7 @@ func (u *Upstream) Forward(ctx context.Context, req *common.NormalizedRequest) ( cfg := u.Config() if reason, skip := u.shouldSkip(req); skip { - return nil, common.NewErrUpstreamRequestSkipped(reason, cfg.Id, req) + return nil, common.NewErrUpstreamRequestSkipped(reason, cfg.Id) } clientType := u.Client.GetType() @@ -205,10 +205,7 @@ func (u *Upstream) Forward(ctx context.Context, req *common.NormalizedRequest) ( if err != nil { return nil, common.NewErrUpstreamRequest( err, - u.ProjectId, - netId, cfg.Id, - method, time.Since(startTime), 0, 0, @@ -319,10 +316,7 @@ func (u *Upstream) Forward(ctx context.Context, req *common.NormalizedRequest) ( if exec != nil { return nil, common.NewErrUpstreamRequest( errCall, - u.ProjectId, - netId, cfg.Id, - method, time.Since(startTime), exec.Attempts(), exec.Retries(), @@ -331,10 +325,7 @@ func (u *Upstream) Forward(ctx context.Context, req *common.NormalizedRequest) ( } else { return nil, common.NewErrUpstreamRequest( errCall, - u.ProjectId, - netId, cfg.Id, - method, time.Since(startTime), 1, 0, @@ -361,7 +352,7 @@ func (u *Upstream) Forward(ctx context.Context, req *common.NormalizedRequest) ( }) if execErr != nil { - return nil, TranslateFailsafeError(execErr) + return nil, TranslateFailsafeError(u.config.Id, method, execErr) } return resp, nil