From b8c9936cbe6e2579dfb97f4c4aa1dd305371ca0e Mon Sep 17 00:00:00 2001 From: Marcin Wolny Date: Thu, 10 Nov 2022 09:32:36 +0100 Subject: [PATCH] no reverse proxy --- internal/proxy/manager.go | 11 -- internal/proxy/manager_test.go | 6 - internal/proxy/proxy.go | 185 ++++++++---------------------- internal/proxy/proxy_test.go | 77 +++++-------- internal/proxy/target.go | 83 ++++++++++++++ internal/rpcgateway/rpcgateway.go | 4 - 6 files changed, 156 insertions(+), 210 deletions(-) create mode 100644 internal/proxy/target.go diff --git a/internal/proxy/manager.go b/internal/proxy/manager.go index 51998f9..d7148ba 100644 --- a/internal/proxy/manager.go +++ b/internal/proxy/manager.go @@ -157,17 +157,6 @@ func (h *HealthcheckManager) Stop(ctx context.Context) error { return nil } -func (h *HealthcheckManager) GetTargetIndexByName(name string) int { - for idx, healthChecker := range h.healthcheckers { - if healthChecker.Name() == name { - return idx - } - } - - zap.L().Error("tried to access a non-existing Healthchecker", zap.String("name", name)) - return 0 -} - func (h *HealthcheckManager) GetTargetByName(name string) Healthchecker { for _, healthChecker := range h.healthcheckers { if healthChecker.Name() == name { diff --git a/internal/proxy/manager_test.go b/internal/proxy/manager_test.go index ef99f16..30ea5f9 100644 --- a/internal/proxy/manager_test.go +++ b/internal/proxy/manager_test.go @@ -43,16 +43,10 @@ func TestHealthcheckManager(t *testing.T) { ctx := context.TODO() go manager.Start(ctx) - nextIdx := manager.GetNextHealthyTargetIndex() - assert.Zero(t, nextIdx) - time.Sleep(1 * time.Second) manager.TaintTarget("AnkrOne") - nextIdx = manager.GetNextHealthyTargetIndex() - assert.Equal(t, 1, nextIdx) - manager.Stop(ctx) } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 2cb145c..fcdc837 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -3,30 +3,22 @@ package proxy import ( "bytes" "context" - "errors" "io" "net/http" - "net/http/httputil" "strconv" - "time" + + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "go.uber.org/zap" ) -type HTTPTarget struct { - Config TargetConfig - Proxy *httputil.ReverseProxy -} - type Proxy struct { config Config targets []*HTTPTarget healthcheckManager *HealthcheckManager metricResponseTime *prometheus.HistogramVec - metricRequestErrors *prometheus.CounterVec metricResponseStatus *prometheus.CounterVec } @@ -57,162 +49,79 @@ func NewProxy(proxyConfig Config, healthCheckManager *HealthcheckManager) *Proxy "provider", "method", }), - metricRequestErrors: promauto.NewCounterVec( + metricResponseStatus: promauto.NewCounterVec( prometheus.CounterOpts{ - Name: "zeroex_rpc_gateway_request_errors_handled_total", - Help: "The total number of request errors handled by gateway", + Name: "zeroex_rpc_gateway_target_response_status_total", + Help: "Total number of responses with a statuscode label", }, []string{ "provider", - "type", + "status_code", }), - metricResponseStatus: promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "zeroex_rpc_gateway_target_response_status_total", - Help: "Total number of responses with a statuscode label", - }, []string{ - "provider", - "status_code", - }), } - for index, target := range proxy.config.Targets { - if err := proxy.AddTarget(target, uint(index)); err != nil { - panic(err) + for _, target := range proxy.config.Targets { + t := &HTTPTarget{ + Config: target, + ClientOptions: HTTPTargetClientOptions{ + Timeout: proxy.config.Proxy.UpstreamTimeout, + }, } + proxy.targets = append(proxy.targets, t) } return proxy } -func (h *Proxy) doModifyResponse(config TargetConfig) func(*http.Response) error { - return func(resp *http.Response) error { - h.metricResponseStatus.WithLabelValues(config.Name, strconv.Itoa(resp.StatusCode)).Inc() - - switch { - // Here's the thing. A different provider may response with a - // different status code for the same query. e.g. call for - // a block that does not exist, Alchemy will serve HTTP 400 - // where Infura will serve HTTP 200. Both of these responses - // hold a concrete error in jsonrpc message. - // - // Having this in mind, we may consider a provider unreliable - // upon these events: - // - HTTP 5xx responses - // - Cannot make a connection after X of retries. - // - // Everything else, as long as it's jsonrpc payload should be - // considered as successful response. - // - case resp.StatusCode == http.StatusTooManyRequests: - // this code generates a fallback to backup provider. - // - zap.L().Warn("rate limited", zap.String("provider", config.Name)) - - return errors.New("rate limited") - - case resp.StatusCode >= http.StatusInternalServerError: - // this code generates a fallback to backup provider. - // - zap.L().Warn("server error", zap.String("provider", config.Name)) - - return errors.New("server error") - } - - return nil +func (h *Proxy) hasError(r *http.Response) bool { + if r.StatusCode == http.StatusTooManyRequests { + return true } -} - -func (h *Proxy) doErrorHandler(config TargetConfig, index uint) func(http.ResponseWriter, *http.Request, error) { - return func(w http.ResponseWriter, r *http.Request, e error) { - // The client canceled the request (e.g. 0x API has a 5s timeout for RPC request) - // we stop here as it doesn't make sense to retry/reroute anymore. - // Also, we don't want to observe a client-canceled request as a failure - if errors.Is(e, context.Canceled) { - h.metricRequestErrors.WithLabelValues(config.Name, "client_closed_connection").Inc() - - return - } - - // Workaround to reserve request body in ReverseProxy.ErrorHandler see - // more here: https://github.com/golang/go/issues/33726 - // - if buf, ok := r.Context().Value("bodybuf").(*bytes.Buffer); ok { - r.Body = io.NopCloser(buf) - } - - zap.L().Warn("handling a failed request", zap.String("provider", config.Name), zap.Error(e)) - // route the request to a different target - h.metricRequestErrors.WithLabelValues(config.Name, "rerouted").Inc() - visitedTargets := GetVisitedTargetsFromContext(r) - - // add the current target to the VisitedTargets slice to exclude it when selecting - // the next target - ctx := context.WithValue(r.Context(), VisitedTargets, append(visitedTargets, index)) - - // adding the targetname in case it errors out and needs to be - // used in metrics in ServeHTTP. - ctx = context.WithValue(ctx, TargetName, config.Name) - - h.ServeHTTP(w, r.WithContext(ctx)) + if r.StatusCode >= http.StatusInternalServerError { + return true } + + return false } -func (h *Proxy) AddTarget(target TargetConfig, index uint) error { - proxy, err := NewReverseProxy(target, h.config) +func (h *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) if err != nil { - return err + http.Error(w, "Internal Server Error", http.StatusInternalServerError) } - // NOTE: any error returned from ModifyResponse will be handled by - // ErrorHandler - // proxy.ModifyResponse = h.doModifyResponse(config) - // - proxy.ModifyResponse = h.doModifyResponse(target) // nolint:bodyclose - proxy.ErrorHandler = h.doErrorHandler(target, index) - - h.targets = append( - h.targets, - &HTTPTarget{ - Config: target, - Proxy: proxy, - }) - - return nil -} + for _, target := range h.targets { + if errors.Is(r.Context().Err(), context.Canceled) { + return + } -func (h *Proxy) GetNextTarget() *HTTPTarget { - idx := h.healthcheckManager.GetNextHealthyTargetIndex() + if !h.healthcheckManager.IsTargetHealthy(target.Config.Name) { + continue + } - if idx < 0 { - return nil - } + c, cancel := context.WithTimeout(context.Background(), h.config.Proxy.UpstreamTimeout) + defer cancel() - return h.targets[idx] -} + r.Body = io.NopCloser(bytes.NewBuffer(body)) -func (h *Proxy) GetNextTargetExcluding(indexes []uint) *HTTPTarget { - idx := h.healthcheckManager.GetNextHealthyTargetIndexExcluding(indexes) + resp, err := target.Do(c, r) + if err != nil { + continue + } + defer resp.Body.Close() - if idx < 0 { - return nil - } + h.metricResponseStatus.WithLabelValues( + target.Config.Name, strconv.Itoa(resp.StatusCode)).Inc() - return h.targets[idx] -} + if h.hasError(resp) { + continue + } -func (h *Proxy) GetNextTargetName() string { - return h.GetNextTarget().Config.Name -} + if _, err := io.Copy(w, resp.Body); err != nil { + http.Error(w, "Internal Server Error", http.StatusInternalServerError) -func (h *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { - visitedTargets := GetVisitedTargetsFromContext(r) - - peer := h.GetNextTargetExcluding(visitedTargets) - if peer != nil { - start := time.Now() - peer.Proxy.ServeHTTP(w, r) - duration := time.Since(start) - h.metricResponseTime.WithLabelValues(peer.Config.Name, r.Method).Observe(duration.Seconds()) + return + } return } diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index f2824c7..9c8e962 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -8,6 +8,7 @@ import ( "net/http/httptest" "strconv" "testing" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" @@ -16,13 +17,13 @@ import ( func createConfig() Config { return Config{ Proxy: ProxyConfig{ - UpstreamTimeout: 0, + UpstreamTimeout: 1 * time.Second, }, HealthChecks: HealthCheckConfig{ - Interval: 0, - Timeout: 0, - FailureThreshold: 0, - SuccessThreshold: 0, + Interval: 5 * time.Second, + Timeout: 1 * time.Second, + FailureThreshold: 2, + SuccessThreshold: 1, }, Targets: []TargetConfig{}, } @@ -133,26 +134,15 @@ func TestHttpFailoverProxyDecompressRequest(t *testing.T) { req, err := http.NewRequest("POST", "/", &buf) req.Header.Add("Content-Encoding", "gzip") - if err != nil { - t.Fatal(err) - } + assert.Nil(t, err) rr := httptest.NewRecorder() handler := http.HandlerFunc(httpFailoverProxy.ServeHTTP) handler.ServeHTTP(rr, req) - want := `{"body": "content"}` - if receivedBody != want { - t.Errorf("the proxy didn't decompress the request before forwarding the body to the target: want: %s, got: %s", want, receivedBody) - } - want = "" - if receivedHeaderContentEncoding != want { - t.Errorf("the proxy didn't remove the `Content-Encoding: gzip` after decompressing the body, want empty, got: %s", receivedHeaderContentEncoding) - } - want = strconv.Itoa(len(`{"body": "content"}`)) - if receivedHeaderContentLength != want { - t.Errorf("the proxy didn't correctly re-calculate the `Content-Length` after decompressing the body, want: %s, got: %s", want, receivedHeaderContentLength) - } + assert.Equal(t, receivedBody, `{"body": "content"}`) + assert.Equal(t, receivedHeaderContentEncoding, "") + assert.Equal(t, receivedHeaderContentLength, strconv.Itoa(len(`{"body": "content"}`))) } func TestHttpFailoverProxyWithCompressionSupportedTarget(t *testing.T) { @@ -190,37 +180,29 @@ func TestHttpFailoverProxyWithCompressionSupportedTarget(t *testing.T) { var buf bytes.Buffer g := gzip.NewWriter(&buf) _, err := g.Write([]byte(`{"body": "content"}`)) - if err != nil { - t.Fatal(err) - } + assert.Nil(t, err) + err = g.Close() - if err != nil { - t.Fatal(err) - } + assert.Nil(t, err) req, err := http.NewRequest("POST", "/", &buf) req.Header.Add("Content-Encoding", "gzip") - if err != nil { - t.Fatal(err) - } + + assert.Nil(t, err) rr := httptest.NewRecorder() handler := http.HandlerFunc(httpFailoverProxy.ServeHTTP) handler.ServeHTTP(rr, req) - want := "gzip" - if receivedHeaderContentEncoding != want { - t.Errorf("the proxy didn't keep the header of `Content-Encoding: gzip`, want: %s, got: %s", want, receivedHeaderContentEncoding) - } + assert.Equal(t, receivedHeaderContentEncoding, "gzip") var wantBody bytes.Buffer g = gzip.NewWriter(&wantBody) g.Write([]byte(`{"body": "content"}`)) g.Close() - if !bytes.Equal(receivedBody, wantBody.Bytes()) { - t.Errorf("the proxy didn't keep the body as is when forwarding gzipped body to the target.") - } + // t.Errorf("the proxy didn't keep the body as is when forwarding gzipped body to the target.") + assert.Equal(t, receivedBody, wantBody.Bytes()) } func TestHTTPFailoverProxyWhenCannotConnectToPrimaryProvider(t *testing.T) { @@ -254,10 +236,11 @@ func TestHTTPFailoverProxyWhenCannotConnectToPrimaryProvider(t *testing.T) { }, }, } - healthcheckManager := NewHealthcheckManager(HealthcheckManagerConfig{ - Targets: rpcGatewayConfig.Targets, - Config: rpcGatewayConfig.HealthChecks, - }) + healthcheckManager := NewHealthcheckManager( + HealthcheckManagerConfig{ + Targets: rpcGatewayConfig.Targets, + Config: rpcGatewayConfig.HealthChecks, + }) // Setup HttpFailoverProxy but not starting the HealthCheckManager so the // no target will be tainted or marked as unhealthy by the @@ -268,21 +251,13 @@ func TestHTTPFailoverProxyWhenCannotConnectToPrimaryProvider(t *testing.T) { requestBody := bytes.NewBufferString(`{"this_is": "body"}`) req, err := http.NewRequest("POST", "/", requestBody) - if err != nil { - t.Fatal(err) - } + assert.Nil(t, err) rr := httptest.NewRecorder() handler := http.HandlerFunc(httpFailoverProxy.ServeHTTP) handler.ServeHTTP(rr, req) - if status := rr.Code; status != http.StatusOK { - t.Errorf("server returned wrong status code: got %v want %v", status, http.StatusOK) - } - - want := `{"this_is": "body"}` - if rr.Body.String() != want { - t.Errorf("server returned unexpected body: got '%v' want '%v'", rr.Body.String(), want) - } + assert.Equal(t, rr.Code, http.StatusOK) + assert.Equal(t, rr.Body.String(), `{"this_is": "body"}`) } diff --git a/internal/proxy/target.go b/internal/proxy/target.go new file mode 100644 index 0000000..941a286 --- /dev/null +++ b/internal/proxy/target.go @@ -0,0 +1,83 @@ +package proxy + +import ( + "bytes" + "compress/gzip" + "context" + "io" + "net/http" + "strings" + "time" + + "github.com/pkg/errors" +) + +type HTTPTarget struct { + Config TargetConfig + ClientOptions HTTPTargetClientOptions +} + +type HTTPTargetClientOptions struct { + Timeout time.Duration +} + +func (h *HTTPTarget) gunzip(c context.Context, r *http.Request) (*http.Request, error) { + gz, err := gzip.NewReader(r.Body) + if err != nil { + return nil, err + } + + /* + Function NewRequestWithContext sets Content-Length for known types ONLY. + Otherwise, you have read data before passing it through. + */ + + data, err := io.ReadAll(gz) + if err != nil { + return nil, err + } + + return http.NewRequestWithContext(c, + http.MethodPost, h.Config.Connection.HTTP.URL, bytes.NewReader(data)) +} + +func (h *HTTPTarget) buildRequest(c context.Context, r *http.Request) (*http.Request, error) { + ok := strings.Contains(r.Header.Get("Content-Encoding"), "gzip") + + if h.Config.Connection.HTTP.Compression { + if ok { + req, err := http.NewRequestWithContext(c, + http.MethodPost, h.Config.Connection.HTTP.URL, r.Body) + req.Header.Set("Content-Encoding", "gzip") + + return req, err + } + + return h.gunzip(c, r) + } + + if !ok { + return http.NewRequestWithContext(c, + http.MethodPost, h.Config.Connection.HTTP.URL, r.Body) + } + + return h.gunzip(c, r) +} + +func (h *HTTPTarget) Do(c context.Context, r *http.Request) (*http.Response, error) { + /* + The requset may be compressed or not. + The backend may support gzip or not. + */ + + req, err := h.buildRequest(c, r) + if err != nil { + return nil, errors.Wrap(err, "request build failed") + } + + client := &http.Client{ + Timeout: h.ClientOptions.Timeout, + } + + return client.Do(req) +} diff --git a/internal/rpcgateway/rpcgateway.go b/internal/rpcgateway/rpcgateway.go index 1cd2702..1549d5c 100644 --- a/internal/rpcgateway/rpcgateway.go +++ b/internal/rpcgateway/rpcgateway.go @@ -62,10 +62,6 @@ func (r *RPCGateway) Stop(ctx context.Context) error { return r.server.Close() } -func (r *RPCGateway) GetCurrentTarget() string { - return r.httpFailoverProxy.GetNextTargetName() -} - func NewRPCGateway(config RPCGatewayConfig) *RPCGateway { healthcheckManager := proxy.NewHealthcheckManager( proxy.HealthcheckManagerConfig{