Skip to content
This repository has been archived by the owner on Jul 22, 2024. It is now read-only.

Commit

Permalink
no reverse proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
eitu5ami committed Oct 5, 2023
1 parent b9c4880 commit 22743f3
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 159 deletions.
11 changes: 0 additions & 11 deletions internal/proxy/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,17 +157,6 @@ func (h *HealthcheckManager) Stop(ctx context.Context) error {
return nil
}

func (h *HealthcheckManager) GetTargetIndexByName(name string) int {
for idx, healthChecker := range h.healthcheckers {
if healthChecker.Name() == name {
return idx
}
}

zap.L().Error("tried to access a non-existing Healthchecker", zap.String("name", name))
return 0
}

func (h *HealthcheckManager) GetTargetByName(name string) Healthchecker {
for _, healthChecker := range h.healthcheckers {
if healthChecker.Name() == name {
Expand Down
6 changes: 0 additions & 6 deletions internal/proxy/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,10 @@ func TestHealthcheckManager(t *testing.T) {
ctx := context.TODO()
go manager.Start(ctx)

nextIdx := manager.GetNextHealthyTargetIndex()
assert.Zero(t, nextIdx)

time.Sleep(1 * time.Second)

manager.TaintTarget("AnkrOne")

nextIdx = manager.GetNextHealthyTargetIndex()
assert.Equal(t, 1, nextIdx)

manager.Stop(ctx)
}

Expand Down
185 changes: 47 additions & 138 deletions internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,22 @@ package proxy
import (
"bytes"
"context"
"errors"
"io"
"net/http"
"net/http/httputil"
"strconv"
"time"

"github.com/pkg/errors"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/zap"
)

type HTTPTarget struct {
Config TargetConfig
Proxy *httputil.ReverseProxy
}

type Proxy struct {
config Config
targets []*HTTPTarget
healthcheckManager *HealthcheckManager

metricResponseTime *prometheus.HistogramVec
metricRequestErrors *prometheus.CounterVec
metricResponseStatus *prometheus.CounterVec
}

Expand Down Expand Up @@ -57,162 +49,79 @@ func NewProxy(proxyConfig Config, healthCheckManager *HealthcheckManager) *Proxy
"provider",
"method",
}),
metricRequestErrors: promauto.NewCounterVec(
metricResponseStatus: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "zeroex_rpc_gateway_request_errors_handled_total",
Help: "The total number of request errors handled by gateway",
Name: "zeroex_rpc_gateway_target_response_status_total",
Help: "Total number of responses with a statuscode label",
}, []string{
"provider",
"type",
"status_code",
}),
metricResponseStatus: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "zeroex_rpc_gateway_target_response_status_total",
Help: "Total number of responses with a statuscode label",
}, []string{
"provider",
"status_code",
}),
}

for index, target := range proxy.config.Targets {
if err := proxy.AddTarget(target, uint(index)); err != nil {
panic(err)
for _, target := range proxy.config.Targets {
t := &HTTPTarget{
Config: target,
ClientOptions: HTTPTargetClientOptions{
Timeout: proxy.config.Proxy.UpstreamTimeout,
},
}
proxy.targets = append(proxy.targets, t)
}

return proxy
}

func (h *Proxy) doModifyResponse(config TargetConfig) func(*http.Response) error {
return func(resp *http.Response) error {
h.metricResponseStatus.WithLabelValues(config.Name, strconv.Itoa(resp.StatusCode)).Inc()

switch {
// Here's the thing. A different provider may response with a
// different status code for the same query. e.g. call for
// a block that does not exist, Alchemy will serve HTTP 400
// where Infura will serve HTTP 200. Both of these responses
// hold a concrete error in jsonrpc message.
//
// Having this in mind, we may consider a provider unreliable
// upon these events:
// - HTTP 5xx responses
// - Cannot make a connection after X of retries.
//
// Everything else, as long as it's jsonrpc payload should be
// considered as successful response.
//
case resp.StatusCode == http.StatusTooManyRequests:
// this code generates a fallback to backup provider.
//
zap.L().Warn("rate limited", zap.String("provider", config.Name))

return errors.New("rate limited")

case resp.StatusCode >= http.StatusInternalServerError:
// this code generates a fallback to backup provider.
//
zap.L().Warn("server error", zap.String("provider", config.Name))

return errors.New("server error")
}

return nil
func (h *Proxy) hasError(r *http.Response) bool {
if r.StatusCode == http.StatusTooManyRequests {
return true
}
}

func (h *Proxy) doErrorHandler(config TargetConfig, index uint) func(http.ResponseWriter, *http.Request, error) {
return func(w http.ResponseWriter, r *http.Request, e error) {
// The client canceled the request (e.g. 0x API has a 5s timeout for RPC request)
// we stop here as it doesn't make sense to retry/reroute anymore.
// Also, we don't want to observe a client-canceled request as a failure
if errors.Is(e, context.Canceled) {
h.metricRequestErrors.WithLabelValues(config.Name, "client_closed_connection").Inc()

return
}

// Workaround to reserve request body in ReverseProxy.ErrorHandler see
// more here: https://github.com/golang/go/issues/33726
//
if buf, ok := r.Context().Value("bodybuf").(*bytes.Buffer); ok {
r.Body = io.NopCloser(buf)
}

zap.L().Warn("handling a failed request", zap.String("provider", config.Name), zap.Error(e))

// route the request to a different target
h.metricRequestErrors.WithLabelValues(config.Name, "rerouted").Inc()
visitedTargets := GetVisitedTargetsFromContext(r)

// add the current target to the VisitedTargets slice to exclude it when selecting
// the next target
ctx := context.WithValue(r.Context(), VisitedTargets, append(visitedTargets, index))

// adding the targetname in case it errors out and needs to be
// used in metrics in ServeHTTP.
ctx = context.WithValue(ctx, TargetName, config.Name)

h.ServeHTTP(w, r.WithContext(ctx))
if r.StatusCode >= http.StatusInternalServerError {
return true
}

return false
}

func (h *Proxy) AddTarget(target TargetConfig, index uint) error {
proxy, err := NewReverseProxy(target, h.config)
func (h *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
return err
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
}

// NOTE: any error returned from ModifyResponse will be handled by
// ErrorHandler
// proxy.ModifyResponse = h.doModifyResponse(config)
//
proxy.ModifyResponse = h.doModifyResponse(target) // nolint:bodyclose
proxy.ErrorHandler = h.doErrorHandler(target, index)

h.targets = append(
h.targets,
&HTTPTarget{
Config: target,
Proxy: proxy,
})

return nil
}
for _, target := range h.targets {
if errors.Is(r.Context().Err(), context.Canceled) {
return
}

func (h *Proxy) GetNextTarget() *HTTPTarget {
idx := h.healthcheckManager.GetNextHealthyTargetIndex()
if !h.healthcheckManager.IsTargetHealthy(target.Config.Name) {
continue
}

if idx < 0 {
return nil
}
c, cancel := context.WithTimeout(context.Background(), h.config.Proxy.UpstreamTimeout)
defer cancel()

return h.targets[idx]
}
r.Body = io.NopCloser(bytes.NewBuffer(body))

func (h *Proxy) GetNextTargetExcluding(indexes []uint) *HTTPTarget {
idx := h.healthcheckManager.GetNextHealthyTargetIndexExcluding(indexes)
resp, err := target.Do(c, r)
if err != nil {
continue
}
defer resp.Body.Close()

if idx < 0 {
return nil
}
h.metricResponseStatus.WithLabelValues(
target.Config.Name, strconv.Itoa(resp.StatusCode)).Inc()

return h.targets[idx]
}
if h.hasError(resp) {
continue
}

func (h *Proxy) GetNextTargetName() string {
return h.GetNextTarget().Config.Name
}
if _, err := io.Copy(w, resp.Body); err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)

func (h *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
visitedTargets := GetVisitedTargetsFromContext(r)

peer := h.GetNextTargetExcluding(visitedTargets)
if peer != nil {
start := time.Now()
peer.Proxy.ServeHTTP(w, r)
duration := time.Since(start)
h.metricResponseTime.WithLabelValues(peer.Config.Name, r.Method).Observe(duration.Seconds())
return
}

return
}
Expand Down
83 changes: 83 additions & 0 deletions internal/proxy/target.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package proxy

import (
"bytes"
"compress/gzip"
"context"
"io"
"net/http"
"strings"
"time"

"github.com/pkg/errors"
)

type HTTPTarget struct {
Config TargetConfig
ClientOptions HTTPTargetClientOptions
}

type HTTPTargetClientOptions struct {
Timeout time.Duration
}

func (h *HTTPTarget) gunzip(c context.Context, r *http.Request) (*http.Request, error) {
gz, err := gzip.NewReader(r.Body)
if err != nil {
return nil, err
}

/*
Function NewRequestWithContext sets Content-Length for known types ONLY.
Otherwise, you have read data before passing it through.
*/

data, err := io.ReadAll(gz)
if err != nil {
return nil, err
}

return http.NewRequestWithContext(c,
http.MethodPost, h.Config.Connection.HTTP.URL, bytes.NewReader(data))
}

func (h *HTTPTarget) buildRequest(c context.Context, r *http.Request) (*http.Request, error) {
ok := strings.Contains(r.Header.Get("Content-Encoding"), "gzip")

if h.Config.Connection.HTTP.Compression {
if ok {
req, err := http.NewRequestWithContext(c,
http.MethodPost, h.Config.Connection.HTTP.URL, r.Body)
req.Header.Set("Content-Encoding", "gzip")

return req, err
}

return h.gunzip(c, r)
}

if !ok {
return http.NewRequestWithContext(c,
http.MethodPost, h.Config.Connection.HTTP.URL, r.Body)
}

return h.gunzip(c, r)
}

func (h *HTTPTarget) Do(c context.Context, r *http.Request) (*http.Response, error) {
/*
The requset may be compressed or not.
The backend may support gzip or not.
*/

req, err := h.buildRequest(c, r)
if err != nil {
return nil, errors.Wrap(err, "request build failed")
}

client := &http.Client{
Timeout: h.ClientOptions.Timeout,
}

return client.Do(req)
}
4 changes: 0 additions & 4 deletions internal/rpcgateway/rpcgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ func (r *RPCGateway) Stop(ctx context.Context) error {
return r.server.Close()
}

func (r *RPCGateway) GetCurrentTarget() string {
return r.httpFailoverProxy.GetNextTargetName()
}

func NewRPCGateway(config RPCGatewayConfig) *RPCGateway {
healthcheckManager := proxy.NewHealthcheckManager(
proxy.HealthcheckManagerConfig{
Expand Down

0 comments on commit 22743f3

Please sign in to comment.