From 0a1ebcf4d84c5dfedad256154d634a3af9d4f9fb Mon Sep 17 00:00:00 2001 From: Marcin Wolny Date: Fri, 8 Dec 2023 12:50:39 +0100 Subject: [PATCH 1/2] Refactor - The request body is no longer copied multiple times. - The request body is no longer stored in context.Context. - No more recursion in ErrorHandler. - The primary target selection logic moved to SeverHTTP. --- .github/workflows/golang.yaml | 2 +- internal/proxy/proxy.go | 138 ++++++------------------------ internal/proxy/proxy_utils.go | 28 ------ internal/proxy/responsewriter.go | 31 +++++++ internal/proxy/reverse_proxy.go | 85 +++--------------- internal/rpcgateway/rpcgateway.go | 4 - 6 files changed, 69 insertions(+), 219 deletions(-) delete mode 100644 internal/proxy/proxy_utils.go create mode 100644 internal/proxy/responsewriter.go diff --git a/.github/workflows/golang.yaml b/.github/workflows/golang.yaml index 9f133e8..bb0407f 100644 --- a/.github/workflows/golang.yaml +++ b/.github/workflows/golang.yaml @@ -13,7 +13,7 @@ concurrency: cancel-in-progress: true env: - GOLANGVERSION: 1.21 + GOLANGVERSION: 1.21.5 jobs: test: diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 2cb145c..7c81ea1 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -2,17 +2,13 @@ package proxy import ( "bytes" - "context" - "errors" "io" "net/http" "net/http/httputil" - "strconv" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "go.uber.org/zap" ) type HTTPTarget struct { @@ -74,8 +70,8 @@ func NewProxy(proxyConfig Config, healthCheckManager *HealthcheckManager) *Proxy }), } - for index, target := range proxy.config.Targets { - if err := proxy.AddTarget(target, uint(index)); err != nil { + for _, target := range proxy.config.Targets { + if err := proxy.AddTarget(target); err != nil { panic(err) } } @@ -83,93 +79,12 @@ func NewProxy(proxyConfig Config, healthCheckManager *HealthcheckManager) *Proxy return proxy } -func (h *Proxy) doModifyResponse(config TargetConfig) func(*http.Response) error { - return func(resp *http.Response) error { - h.metricResponseStatus.WithLabelValues(config.Name, strconv.Itoa(resp.StatusCode)).Inc() - - switch { - // Here's the thing. A different provider may response with a - // different status code for the same query. e.g. call for - // a block that does not exist, Alchemy will serve HTTP 400 - // where Infura will serve HTTP 200. Both of these responses - // hold a concrete error in jsonrpc message. - // - // Having this in mind, we may consider a provider unreliable - // upon these events: - // - HTTP 5xx responses - // - Cannot make a connection after X of retries. - // - // Everything else, as long as it's jsonrpc payload should be - // considered as successful response. - // - case resp.StatusCode == http.StatusTooManyRequests: - // this code generates a fallback to backup provider. - // - zap.L().Warn("rate limited", zap.String("provider", config.Name)) - - return errors.New("rate limited") - - case resp.StatusCode >= http.StatusInternalServerError: - // this code generates a fallback to backup provider. - // - zap.L().Warn("server error", zap.String("provider", config.Name)) - - return errors.New("server error") - } - - return nil - } -} - -func (h *Proxy) doErrorHandler(config TargetConfig, index uint) func(http.ResponseWriter, *http.Request, error) { - return func(w http.ResponseWriter, r *http.Request, e error) { - // The client canceled the request (e.g. 0x API has a 5s timeout for RPC request) - // we stop here as it doesn't make sense to retry/reroute anymore. - // Also, we don't want to observe a client-canceled request as a failure - if errors.Is(e, context.Canceled) { - h.metricRequestErrors.WithLabelValues(config.Name, "client_closed_connection").Inc() - - return - } - - // Workaround to reserve request body in ReverseProxy.ErrorHandler see - // more here: https://github.com/golang/go/issues/33726 - // - if buf, ok := r.Context().Value("bodybuf").(*bytes.Buffer); ok { - r.Body = io.NopCloser(buf) - } - - zap.L().Warn("handling a failed request", zap.String("provider", config.Name), zap.Error(e)) - - // route the request to a different target - h.metricRequestErrors.WithLabelValues(config.Name, "rerouted").Inc() - visitedTargets := GetVisitedTargetsFromContext(r) - - // add the current target to the VisitedTargets slice to exclude it when selecting - // the next target - ctx := context.WithValue(r.Context(), VisitedTargets, append(visitedTargets, index)) - - // adding the targetname in case it errors out and needs to be - // used in metrics in ServeHTTP. - ctx = context.WithValue(ctx, TargetName, config.Name) - - h.ServeHTTP(w, r.WithContext(ctx)) - } -} - -func (h *Proxy) AddTarget(target TargetConfig, index uint) error { +func (h *Proxy) AddTarget(target TargetConfig) error { proxy, err := NewReverseProxy(target, h.config) if err != nil { return err } - // NOTE: any error returned from ModifyResponse will be handled by - // ErrorHandler - // proxy.ModifyResponse = h.doModifyResponse(config) - // - proxy.ModifyResponse = h.doModifyResponse(target) // nolint:bodyclose - proxy.ErrorHandler = h.doErrorHandler(target, index) - h.targets = append( h.targets, &HTTPTarget{ @@ -180,42 +95,39 @@ func (h *Proxy) AddTarget(target TargetConfig, index uint) error { return nil } -func (h *Proxy) GetNextTarget() *HTTPTarget { - idx := h.healthcheckManager.GetNextHealthyTargetIndex() +func (h *Proxy) HasNodeProviderFailed(statusCode int) bool { + return statusCode >= http.StatusInternalServerError || statusCode == http.StatusTooManyRequests +} - if idx < 0 { - return nil +func (h *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { + body := &bytes.Buffer{} + + if _, err := io.Copy(body, r.Body); err != nil { + http.Error(w, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable) } - return h.targets[idx] -} + for _, target := range h.targets { + start := time.Now() -func (h *Proxy) GetNextTargetExcluding(indexes []uint) *HTTPTarget { - idx := h.healthcheckManager.GetNextHealthyTargetIndexExcluding(indexes) + pw := NewResponseWriter() + r.Body = io.NopCloser(bytes.NewBuffer(body.Bytes())) - if idx < 0 { - return nil - } + target.Proxy.ServeHTTP(pw, r) - return h.targets[idx] -} + if h.HasNodeProviderFailed(pw.statusCode) { + h.metricResponseTime.WithLabelValues(target.Config.Name, r.Method).Observe(time.Since(start).Seconds()) + h.metricRequestErrors.WithLabelValues(target.Config.Name, "rerouted").Inc() -func (h *Proxy) GetNextTargetName() string { - return h.GetNextTarget().Config.Name -} + continue + } -func (h *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { - visitedTargets := GetVisitedTargetsFromContext(r) + w.WriteHeader(pw.statusCode) + w.Write(pw.body.Bytes()) // nolint:errcheck - peer := h.GetNextTargetExcluding(visitedTargets) - if peer != nil { - start := time.Now() - peer.Proxy.ServeHTTP(w, r) - duration := time.Since(start) - h.metricResponseTime.WithLabelValues(peer.Config.Name, r.Method).Observe(duration.Seconds()) + h.metricResponseTime.WithLabelValues(target.Config.Name, r.Method).Observe(time.Since(start).Seconds()) return } - http.Error(w, "Service not available", http.StatusServiceUnavailable) + http.Error(w, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable) } diff --git a/internal/proxy/proxy_utils.go b/internal/proxy/proxy_utils.go deleted file mode 100644 index aa61e2b..0000000 --- a/internal/proxy/proxy_utils.go +++ /dev/null @@ -1,28 +0,0 @@ -package proxy - -import ( - "net/http" -) - -type ContextFailoverKeyInt int - -const ( - TargetName ContextFailoverKeyInt = iota - VisitedTargets -) - -// GetVisitedTargetsFromContext returns the visited targets for request. -func GetVisitedTargetsFromContext(r *http.Request) []uint { - if visitedTargets, ok := r.Context().Value(VisitedTargets).([]uint); ok { - return visitedTargets - } - return []uint{} -} - -// GetTargetNameFromContext returns the target name for request. -func GetTargetNameFromContext(r *http.Request) string { - if targetName, ok := r.Context().Value(TargetName).(string); ok { - return targetName - } - return "" -} diff --git a/internal/proxy/responsewriter.go b/internal/proxy/responsewriter.go new file mode 100644 index 0000000..634a68c --- /dev/null +++ b/internal/proxy/responsewriter.go @@ -0,0 +1,31 @@ +package proxy + +import ( + "bytes" + "net/http" +) + +type ReponseWriter struct { + body *bytes.Buffer + header http.Header + statusCode int +} + +func (p *ReponseWriter) Header() http.Header { + return p.header +} + +func (p *ReponseWriter) Write(b []byte) (int, error) { + return p.body.Write(b) +} + +func (p *ReponseWriter) WriteHeader(statusCode int) { + p.statusCode = statusCode +} + +func NewResponseWriter() *ReponseWriter { + return &ReponseWriter{ + header: http.Header{}, + body: &bytes.Buffer{}, + } +} diff --git a/internal/proxy/reverse_proxy.go b/internal/proxy/reverse_proxy.go index f086017..8c72e7e 100644 --- a/internal/proxy/reverse_proxy.go +++ b/internal/proxy/reverse_proxy.go @@ -3,12 +3,12 @@ package proxy import ( "bytes" "compress/gzip" - "context" "io" "net" "net/http" "net/http/httputil" "net/url" + "strings" "time" "github.com/mwitkow/go-conntrack" @@ -17,88 +17,30 @@ import ( "go.uber.org/zap" ) -// TODO -// This code needs a new abstraction. We should bring a model and attach helper to a model. -// - func doProcessRequest(r *http.Request, config TargetConfig) error { - var body io.Reader - var buf bytes.Buffer - var err error - - if r.Body == nil { - return errors.New("no body") - } - - // The standard library stores ContentLength as signed data type. - // - if r.ContentLength == 0 || r.ContentLength < 0 { - return errors.New("invalid content length") - } - - if r.Header.Get("Content-Encoding") == "gzip" && !config.Connection.HTTP.Compression { - body, err = doGunzip(r) - if err != nil { - return errors.Wrap(err, "cannot gunzip data") - } - } else { - body = io.TeeReader(r.Body, &buf) - } - - // I don't like so much but the refactor is coming up soon! - // - // This is nothing more than ugly a workaround. - // This code guarantee the context buf will not be empty upon primary - // provider roundtrip failures. - // - data, err := io.ReadAll(body) - if err != nil { - return errors.New("cannot read body") + if strings.Contains(r.Header.Get("Content-Encoding"), "gzip") && !config.Connection.HTTP.Compression { + return errors.Wrap(doGunzip(r), "gunzip failed") } - r.Body = io.NopCloser(bytes.NewBuffer(data)) - - // Here's an interesting fact. There's no data in buf, until a call - // to Read(). With Read() call, it will write data to bytes.Buffer. - // - // I want to call it out, because it's damn smart. - // - ctx := context.WithValue(r.Context(), "bodybuf", &buf) // nolint:revive,staticcheck - - // WithContext creates a shallow copy. It's highly important to - // override underlying memory pointed by pointer. - // - r2 := r.WithContext(ctx) - *r = *r2 - return nil } -func doGunzip(r *http.Request) (io.Reader, error) { - var buf bytes.Buffer - var body io.Reader - +func doGunzip(r *http.Request) error { uncompressed, err := gzip.NewReader(r.Body) if err != nil { - return nil, errors.Wrap(err, "cannot decompress the data") - } - // Decompress the body. - // - data, err := io.ReadAll(uncompressed) - if err != nil { - return nil, errors.Wrap(err, "cannot read uncompressed data") + return errors.Wrap(err, "cannot decompress the data") } - // Replace body content with uncompressed data - // Remove the "Content-Encoding: gzip" because the body is decompressed already - // and correct the Content-Length header - // - body = io.TeeReader(bytes.NewReader(data), &buf) + body := &bytes.Buffer{} + if _, err := io.Copy(body, uncompressed); err != nil { // nolint:gosec + return errors.Wrap(err, "cannot read uncompressed data") + } r.Header.Del("Content-Encoding") - r.ContentLength = int64(len(data)) + r.Body = io.NopCloser(body) + r.ContentLength = int64(body.Len()) - return body, nil + return nil } func NewReverseProxy(targetConfig TargetConfig, config Config) (*httputil.ReverseProxy, error) { @@ -114,9 +56,6 @@ func NewReverseProxy(targetConfig TargetConfig, config Config) (*httputil.Revers r.URL.Host = target.Host r.URL.Path = target.Path - // Workaround to reserve request body in ReverseProxy.ErrorHandler - // see more here: https://github.com/golang/go/issues/33726 - // if err := doProcessRequest(r, targetConfig); err != nil { zap.L().Error("cannot process request", zap.Error(err)) } diff --git a/internal/rpcgateway/rpcgateway.go b/internal/rpcgateway/rpcgateway.go index 1cd2702..1549d5c 100644 --- a/internal/rpcgateway/rpcgateway.go +++ b/internal/rpcgateway/rpcgateway.go @@ -62,10 +62,6 @@ func (r *RPCGateway) Stop(ctx context.Context) error { return r.server.Close() } -func (r *RPCGateway) GetCurrentTarget() string { - return r.httpFailoverProxy.GetNextTargetName() -} - func NewRPCGateway(config RPCGatewayConfig) *RPCGateway { healthcheckManager := proxy.NewHealthcheckManager( proxy.HealthcheckManagerConfig{ From 84305b07a9cc6c26ad195f4b3b23cf21c2e9360e Mon Sep 17 00:00:00 2001 From: Marcin Wolny Date: Sat, 9 Dec 2023 10:36:27 +0100 Subject: [PATCH 2/2] fix: rate limits exceeded Use private nodes when running tests and fallback to public nodes when configuration is not provided. --- .golangci.yml | 1 + go.mod | 1 + go.sum | 2 ++ internal/proxy/healthchecker_test.go | 5 +++-- internal/proxy/manager_test.go | 7 ++++--- internal/rpcgateway/rpcgateway_test.go | 5 +++-- 6 files changed, 14 insertions(+), 7 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index b7f94c6..c41e523 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -59,6 +59,7 @@ linters-settings: - github.com/stretchr/testify - github.com/slok/go-http-metrics - github.com/purini-to/zapmw + - github.com/caitlinelfring/go-env-default issues: max-same-issues: 0 # unlimited diff --git a/go.mod b/go.mod index 5eebdc0..af1105c 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.21 require ( github.com/Shopify/toxiproxy v2.1.4+incompatible + github.com/caitlinelfring/go-env-default v1.1.0 github.com/ethereum/go-ethereum v1.13.5 github.com/gorilla/mux v1.8.1 github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f diff --git a/go.sum b/go.sum index 2e2b733..3bdbeae 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWso github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/caitlinelfring/go-env-default v1.1.0 h1:bhDfXmUolvcIGfQCX8qevQX8wxC54NGz0aimoUnhvDM= +github.com/caitlinelfring/go-env-default v1.1.0/go.mod h1:tESXPr8zFPP/cRy3cwxrHBmjJIf2A1x/o4C9CET2rEk= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/internal/proxy/healthchecker_test.go b/internal/proxy/healthchecker_test.go index 9593763..803d7c5 100644 --- a/internal/proxy/healthchecker_test.go +++ b/internal/proxy/healthchecker_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/caitlinelfring/go-env-default" "github.com/stretchr/testify/assert" "go.uber.org/zap" ) @@ -22,7 +23,7 @@ func TestBasicHealthchecker(t *testing.T) { defer cancel() healtcheckConfig := RPCHealthcheckerConfig{ - URL: "https://cloudflare-eth.com", + URL: env.GetDefault("RPC_GATEWAY_NODE_URL_1", "https://cloudflare-eth.com"), Interval: 1 * time.Second, Timeout: 2 * time.Second, FailureThreshold: 1, @@ -48,7 +49,7 @@ func TestBasicHealthchecker(t *testing.T) { func TestGasLeftCall(t *testing.T) { client := &http.Client{} - url := "https://cloudflare-eth.com" + url := env.GetDefault("RPC_GATEWAY_NODE_URL_1", "https://cloudflare-eth.com") result, err := performGasLeftCall(context.TODO(), client, url) assert.Nil(t, err) diff --git a/internal/proxy/manager_test.go b/internal/proxy/manager_test.go index b009157..13cf1a2 100644 --- a/internal/proxy/manager_test.go +++ b/internal/proxy/manager_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/caitlinelfring/go-env-default" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" ) @@ -18,7 +19,7 @@ func TestHealthcheckManager(t *testing.T) { Name: "Primary", Connection: TargetConfigConnection{ HTTP: TargetConnectionHTTP{ - URL: "https://cloudflare-eth.com", + URL: env.GetDefault("RPC_GATEWAY_NODE_URL_1", "https://cloudflare-eth.com"), }, }, }, @@ -26,7 +27,7 @@ func TestHealthcheckManager(t *testing.T) { Name: "StandBy", Connection: TargetConfigConnection{ HTTP: TargetConnectionHTTP{ - URL: "https://cloudflare-eth.com", + URL: env.GetDefault("RPC_GATEWAY_NODE_URL_1", "https://eth.public-rpc.com"), }, }, }, @@ -65,7 +66,7 @@ func TestGetNextHealthyTargetIndexExcluding(t *testing.T) { Name: "Primary", Connection: TargetConfigConnection{ HTTP: TargetConnectionHTTP{ - URL: "https://cloudflare-eth.com", + URL: env.GetDefault("RPC_GATEWAY_NODE_URL_1", "https://cloudflare-eth.com"), }, }, }, diff --git a/internal/rpcgateway/rpcgateway_test.go b/internal/rpcgateway/rpcgateway_test.go index b8f5f8a..6945fa6 100644 --- a/internal/rpcgateway/rpcgateway_test.go +++ b/internal/rpcgateway/rpcgateway_test.go @@ -11,6 +11,7 @@ import ( "testing" toxiproxy "github.com/Shopify/toxiproxy/client" + "github.com/caitlinelfring/go-env-default" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "go.uber.org/zap" @@ -77,7 +78,7 @@ func TestRpcGatewayFailover(t *testing.T) { err := toxiClient.ResetState() assert.Nil(t, err) - proxy, err := toxiClient.CreateProxy("cloudflare", "0.0.0.0:9991", ts.URL[7:]) + proxy, err := toxiClient.CreateProxy("primary", "0.0.0.0:9991", ts.URL[7:]) assert.Nil(t, err) _, err = proxy.AddToxic("latency_down", "latency", "downstream", 1.0, toxiproxy.Attributes{ @@ -92,7 +93,7 @@ func TestRpcGatewayFailover(t *testing.T) { // config string var tpl bytes.Buffer - tu := TestURL{"http://0.0.0.0:9991", "https://cloudflare-eth.com"} + tu := TestURL{"http://0.0.0.0:9991", env.GetDefault("RPC_GATEWAY_NODE_URL_1", "https://cloudflare-eth.com")} tmpl, err := template.New("test").Parse(rpcGatewayConfig) assert.Nil(t, err)