Skip to content
This repository has been archived by the owner on Jul 22, 2024. It is now read-only.

Commit

Permalink
no reverse proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
eitu5ami committed Oct 5, 2023
1 parent b9c4880 commit b8c9936
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 210 deletions.
11 changes: 0 additions & 11 deletions internal/proxy/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,17 +157,6 @@ func (h *HealthcheckManager) Stop(ctx context.Context) error {
return nil
}

func (h *HealthcheckManager) GetTargetIndexByName(name string) int {
for idx, healthChecker := range h.healthcheckers {
if healthChecker.Name() == name {
return idx
}
}

zap.L().Error("tried to access a non-existing Healthchecker", zap.String("name", name))
return 0
}

func (h *HealthcheckManager) GetTargetByName(name string) Healthchecker {
for _, healthChecker := range h.healthcheckers {
if healthChecker.Name() == name {
Expand Down
6 changes: 0 additions & 6 deletions internal/proxy/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,10 @@ func TestHealthcheckManager(t *testing.T) {
ctx := context.TODO()
go manager.Start(ctx)

nextIdx := manager.GetNextHealthyTargetIndex()
assert.Zero(t, nextIdx)

time.Sleep(1 * time.Second)

manager.TaintTarget("AnkrOne")

nextIdx = manager.GetNextHealthyTargetIndex()
assert.Equal(t, 1, nextIdx)

manager.Stop(ctx)
}

Expand Down
185 changes: 47 additions & 138 deletions internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,22 @@ package proxy
import (
"bytes"
"context"
"errors"
"io"
"net/http"
"net/http/httputil"
"strconv"
"time"

"github.com/pkg/errors"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/zap"
)

type HTTPTarget struct {
Config TargetConfig
Proxy *httputil.ReverseProxy
}

type Proxy struct {
config Config
targets []*HTTPTarget
healthcheckManager *HealthcheckManager

metricResponseTime *prometheus.HistogramVec
metricRequestErrors *prometheus.CounterVec
metricResponseStatus *prometheus.CounterVec
}

Expand Down Expand Up @@ -57,162 +49,79 @@ func NewProxy(proxyConfig Config, healthCheckManager *HealthcheckManager) *Proxy
"provider",
"method",
}),
metricRequestErrors: promauto.NewCounterVec(
metricResponseStatus: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "zeroex_rpc_gateway_request_errors_handled_total",
Help: "The total number of request errors handled by gateway",
Name: "zeroex_rpc_gateway_target_response_status_total",
Help: "Total number of responses with a statuscode label",
}, []string{
"provider",
"type",
"status_code",
}),
metricResponseStatus: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "zeroex_rpc_gateway_target_response_status_total",
Help: "Total number of responses with a statuscode label",
}, []string{
"provider",
"status_code",
}),
}

for index, target := range proxy.config.Targets {
if err := proxy.AddTarget(target, uint(index)); err != nil {
panic(err)
for _, target := range proxy.config.Targets {
t := &HTTPTarget{
Config: target,
ClientOptions: HTTPTargetClientOptions{
Timeout: proxy.config.Proxy.UpstreamTimeout,
},
}
proxy.targets = append(proxy.targets, t)
}

return proxy
}

func (h *Proxy) doModifyResponse(config TargetConfig) func(*http.Response) error {
return func(resp *http.Response) error {
h.metricResponseStatus.WithLabelValues(config.Name, strconv.Itoa(resp.StatusCode)).Inc()

switch {
// Here's the thing. A different provider may response with a
// different status code for the same query. e.g. call for
// a block that does not exist, Alchemy will serve HTTP 400
// where Infura will serve HTTP 200. Both of these responses
// hold a concrete error in jsonrpc message.
//
// Having this in mind, we may consider a provider unreliable
// upon these events:
// - HTTP 5xx responses
// - Cannot make a connection after X of retries.
//
// Everything else, as long as it's jsonrpc payload should be
// considered as successful response.
//
case resp.StatusCode == http.StatusTooManyRequests:
// this code generates a fallback to backup provider.
//
zap.L().Warn("rate limited", zap.String("provider", config.Name))

return errors.New("rate limited")

case resp.StatusCode >= http.StatusInternalServerError:
// this code generates a fallback to backup provider.
//
zap.L().Warn("server error", zap.String("provider", config.Name))

return errors.New("server error")
}

return nil
func (h *Proxy) hasError(r *http.Response) bool {
if r.StatusCode == http.StatusTooManyRequests {
return true
}
}

func (h *Proxy) doErrorHandler(config TargetConfig, index uint) func(http.ResponseWriter, *http.Request, error) {
return func(w http.ResponseWriter, r *http.Request, e error) {
// The client canceled the request (e.g. 0x API has a 5s timeout for RPC request)
// we stop here as it doesn't make sense to retry/reroute anymore.
// Also, we don't want to observe a client-canceled request as a failure
if errors.Is(e, context.Canceled) {
h.metricRequestErrors.WithLabelValues(config.Name, "client_closed_connection").Inc()

return
}

// Workaround to reserve request body in ReverseProxy.ErrorHandler see
// more here: https://github.com/golang/go/issues/33726
//
if buf, ok := r.Context().Value("bodybuf").(*bytes.Buffer); ok {
r.Body = io.NopCloser(buf)
}

zap.L().Warn("handling a failed request", zap.String("provider", config.Name), zap.Error(e))

// route the request to a different target
h.metricRequestErrors.WithLabelValues(config.Name, "rerouted").Inc()
visitedTargets := GetVisitedTargetsFromContext(r)

// add the current target to the VisitedTargets slice to exclude it when selecting
// the next target
ctx := context.WithValue(r.Context(), VisitedTargets, append(visitedTargets, index))

// adding the targetname in case it errors out and needs to be
// used in metrics in ServeHTTP.
ctx = context.WithValue(ctx, TargetName, config.Name)

h.ServeHTTP(w, r.WithContext(ctx))
if r.StatusCode >= http.StatusInternalServerError {
return true
}

return false
}

func (h *Proxy) AddTarget(target TargetConfig, index uint) error {
proxy, err := NewReverseProxy(target, h.config)
func (h *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
return err
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
}

// NOTE: any error returned from ModifyResponse will be handled by
// ErrorHandler
// proxy.ModifyResponse = h.doModifyResponse(config)
//
proxy.ModifyResponse = h.doModifyResponse(target) // nolint:bodyclose
proxy.ErrorHandler = h.doErrorHandler(target, index)

h.targets = append(
h.targets,
&HTTPTarget{
Config: target,
Proxy: proxy,
})

return nil
}
for _, target := range h.targets {
if errors.Is(r.Context().Err(), context.Canceled) {
return
}

func (h *Proxy) GetNextTarget() *HTTPTarget {
idx := h.healthcheckManager.GetNextHealthyTargetIndex()
if !h.healthcheckManager.IsTargetHealthy(target.Config.Name) {
continue
}

if idx < 0 {
return nil
}
c, cancel := context.WithTimeout(context.Background(), h.config.Proxy.UpstreamTimeout)
defer cancel()

return h.targets[idx]
}
r.Body = io.NopCloser(bytes.NewBuffer(body))

func (h *Proxy) GetNextTargetExcluding(indexes []uint) *HTTPTarget {
idx := h.healthcheckManager.GetNextHealthyTargetIndexExcluding(indexes)
resp, err := target.Do(c, r)
if err != nil {
continue
}
defer resp.Body.Close()

if idx < 0 {
return nil
}
h.metricResponseStatus.WithLabelValues(
target.Config.Name, strconv.Itoa(resp.StatusCode)).Inc()

return h.targets[idx]
}
if h.hasError(resp) {
continue
}

func (h *Proxy) GetNextTargetName() string {
return h.GetNextTarget().Config.Name
}
if _, err := io.Copy(w, resp.Body); err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)

func (h *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
visitedTargets := GetVisitedTargetsFromContext(r)

peer := h.GetNextTargetExcluding(visitedTargets)
if peer != nil {
start := time.Now()
peer.Proxy.ServeHTTP(w, r)
duration := time.Since(start)
h.metricResponseTime.WithLabelValues(peer.Config.Name, r.Method).Observe(duration.Seconds())
return
}

return
}
Expand Down
Loading

0 comments on commit b8c9936

Please sign in to comment.