Skip to content
This repository has been archived by the owner on Jul 22, 2024. It is now read-only.

Commit

Permalink
Merge pull request #134 from 0xProject/marcin/development
Browse files Browse the repository at this point in the history
Refactor
  • Loading branch information
eitu5ami authored Dec 9, 2023
2 parents 11ac3b5 + 84305b0 commit bda102e
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 226 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/golang.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ concurrency:
cancel-in-progress: true

env:
GOLANGVERSION: 1.21
GOLANGVERSION: 1.21.5

jobs:
test:
Expand Down
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ linters-settings:
- github.com/stretchr/testify
- github.com/slok/go-http-metrics
- github.com/purini-to/zapmw
- github.com/caitlinelfring/go-env-default

issues:
max-same-issues: 0 # unlimited
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.21

require (
github.com/Shopify/toxiproxy v2.1.4+incompatible
github.com/caitlinelfring/go-env-default v1.1.0
github.com/ethereum/go-ethereum v1.13.5
github.com/gorilla/mux v1.8.1
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWso
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/caitlinelfring/go-env-default v1.1.0 h1:bhDfXmUolvcIGfQCX8qevQX8wxC54NGz0aimoUnhvDM=
github.com/caitlinelfring/go-env-default v1.1.0/go.mod h1:tESXPr8zFPP/cRy3cwxrHBmjJIf2A1x/o4C9CET2rEk=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down
5 changes: 3 additions & 2 deletions internal/proxy/healthchecker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/caitlinelfring/go-env-default"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
Expand All @@ -22,7 +23,7 @@ func TestBasicHealthchecker(t *testing.T) {
defer cancel()

healtcheckConfig := RPCHealthcheckerConfig{
URL: "https://cloudflare-eth.com",
URL: env.GetDefault("RPC_GATEWAY_NODE_URL_1", "https://cloudflare-eth.com"),
Interval: 1 * time.Second,
Timeout: 2 * time.Second,
FailureThreshold: 1,
Expand All @@ -48,7 +49,7 @@ func TestBasicHealthchecker(t *testing.T) {

func TestGasLeftCall(t *testing.T) {
client := &http.Client{}
url := "https://cloudflare-eth.com"
url := env.GetDefault("RPC_GATEWAY_NODE_URL_1", "https://cloudflare-eth.com")

result, err := performGasLeftCall(context.TODO(), client, url)
assert.Nil(t, err)
Expand Down
7 changes: 4 additions & 3 deletions internal/proxy/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/caitlinelfring/go-env-default"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
)
Expand All @@ -18,15 +19,15 @@ func TestHealthcheckManager(t *testing.T) {
Name: "Primary",
Connection: TargetConfigConnection{
HTTP: TargetConnectionHTTP{
URL: "https://cloudflare-eth.com",
URL: env.GetDefault("RPC_GATEWAY_NODE_URL_1", "https://cloudflare-eth.com"),
},
},
},
{
Name: "StandBy",
Connection: TargetConfigConnection{
HTTP: TargetConnectionHTTP{
URL: "https://cloudflare-eth.com",
URL: env.GetDefault("RPC_GATEWAY_NODE_URL_1", "https://eth.public-rpc.com"),
},
},
},
Expand Down Expand Up @@ -65,7 +66,7 @@ func TestGetNextHealthyTargetIndexExcluding(t *testing.T) {
Name: "Primary",
Connection: TargetConfigConnection{
HTTP: TargetConnectionHTTP{
URL: "https://cloudflare-eth.com",
URL: env.GetDefault("RPC_GATEWAY_NODE_URL_1", "https://cloudflare-eth.com"),
},
},
},
Expand Down
138 changes: 25 additions & 113 deletions internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@ package proxy

import (
"bytes"
"context"
"errors"
"io"
"net/http"
"net/http/httputil"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/zap"
)

type HTTPTarget struct {
Expand Down Expand Up @@ -74,102 +70,21 @@ func NewProxy(proxyConfig Config, healthCheckManager *HealthcheckManager) *Proxy
}),
}

for index, target := range proxy.config.Targets {
if err := proxy.AddTarget(target, uint(index)); err != nil {
for _, target := range proxy.config.Targets {
if err := proxy.AddTarget(target); err != nil {
panic(err)
}
}

return proxy
}

func (h *Proxy) doModifyResponse(config TargetConfig) func(*http.Response) error {
return func(resp *http.Response) error {
h.metricResponseStatus.WithLabelValues(config.Name, strconv.Itoa(resp.StatusCode)).Inc()

switch {
// Here's the thing. A different provider may response with a
// different status code for the same query. e.g. call for
// a block that does not exist, Alchemy will serve HTTP 400
// where Infura will serve HTTP 200. Both of these responses
// hold a concrete error in jsonrpc message.
//
// Having this in mind, we may consider a provider unreliable
// upon these events:
// - HTTP 5xx responses
// - Cannot make a connection after X of retries.
//
// Everything else, as long as it's jsonrpc payload should be
// considered as successful response.
//
case resp.StatusCode == http.StatusTooManyRequests:
// this code generates a fallback to backup provider.
//
zap.L().Warn("rate limited", zap.String("provider", config.Name))

return errors.New("rate limited")

case resp.StatusCode >= http.StatusInternalServerError:
// this code generates a fallback to backup provider.
//
zap.L().Warn("server error", zap.String("provider", config.Name))

return errors.New("server error")
}

return nil
}
}

func (h *Proxy) doErrorHandler(config TargetConfig, index uint) func(http.ResponseWriter, *http.Request, error) {
return func(w http.ResponseWriter, r *http.Request, e error) {
// The client canceled the request (e.g. 0x API has a 5s timeout for RPC request)
// we stop here as it doesn't make sense to retry/reroute anymore.
// Also, we don't want to observe a client-canceled request as a failure
if errors.Is(e, context.Canceled) {
h.metricRequestErrors.WithLabelValues(config.Name, "client_closed_connection").Inc()

return
}

// Workaround to reserve request body in ReverseProxy.ErrorHandler see
// more here: https://github.com/golang/go/issues/33726
//
if buf, ok := r.Context().Value("bodybuf").(*bytes.Buffer); ok {
r.Body = io.NopCloser(buf)
}

zap.L().Warn("handling a failed request", zap.String("provider", config.Name), zap.Error(e))

// route the request to a different target
h.metricRequestErrors.WithLabelValues(config.Name, "rerouted").Inc()
visitedTargets := GetVisitedTargetsFromContext(r)

// add the current target to the VisitedTargets slice to exclude it when selecting
// the next target
ctx := context.WithValue(r.Context(), VisitedTargets, append(visitedTargets, index))

// adding the targetname in case it errors out and needs to be
// used in metrics in ServeHTTP.
ctx = context.WithValue(ctx, TargetName, config.Name)

h.ServeHTTP(w, r.WithContext(ctx))
}
}

func (h *Proxy) AddTarget(target TargetConfig, index uint) error {
func (h *Proxy) AddTarget(target TargetConfig) error {
proxy, err := NewReverseProxy(target, h.config)
if err != nil {
return err
}

// NOTE: any error returned from ModifyResponse will be handled by
// ErrorHandler
// proxy.ModifyResponse = h.doModifyResponse(config)
//
proxy.ModifyResponse = h.doModifyResponse(target) // nolint:bodyclose
proxy.ErrorHandler = h.doErrorHandler(target, index)

h.targets = append(
h.targets,
&HTTPTarget{
Expand All @@ -180,42 +95,39 @@ func (h *Proxy) AddTarget(target TargetConfig, index uint) error {
return nil
}

func (h *Proxy) GetNextTarget() *HTTPTarget {
idx := h.healthcheckManager.GetNextHealthyTargetIndex()
func (h *Proxy) HasNodeProviderFailed(statusCode int) bool {
return statusCode >= http.StatusInternalServerError || statusCode == http.StatusTooManyRequests
}

if idx < 0 {
return nil
func (h *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
body := &bytes.Buffer{}

if _, err := io.Copy(body, r.Body); err != nil {
http.Error(w, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable)
}

return h.targets[idx]
}
for _, target := range h.targets {
start := time.Now()

func (h *Proxy) GetNextTargetExcluding(indexes []uint) *HTTPTarget {
idx := h.healthcheckManager.GetNextHealthyTargetIndexExcluding(indexes)
pw := NewResponseWriter()
r.Body = io.NopCloser(bytes.NewBuffer(body.Bytes()))

if idx < 0 {
return nil
}
target.Proxy.ServeHTTP(pw, r)

return h.targets[idx]
}
if h.HasNodeProviderFailed(pw.statusCode) {
h.metricResponseTime.WithLabelValues(target.Config.Name, r.Method).Observe(time.Since(start).Seconds())
h.metricRequestErrors.WithLabelValues(target.Config.Name, "rerouted").Inc()

func (h *Proxy) GetNextTargetName() string {
return h.GetNextTarget().Config.Name
}
continue
}

func (h *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
visitedTargets := GetVisitedTargetsFromContext(r)
w.WriteHeader(pw.statusCode)
w.Write(pw.body.Bytes()) // nolint:errcheck

peer := h.GetNextTargetExcluding(visitedTargets)
if peer != nil {
start := time.Now()
peer.Proxy.ServeHTTP(w, r)
duration := time.Since(start)
h.metricResponseTime.WithLabelValues(peer.Config.Name, r.Method).Observe(duration.Seconds())
h.metricResponseTime.WithLabelValues(target.Config.Name, r.Method).Observe(time.Since(start).Seconds())

return
}

http.Error(w, "Service not available", http.StatusServiceUnavailable)
http.Error(w, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable)
}
28 changes: 0 additions & 28 deletions internal/proxy/proxy_utils.go

This file was deleted.

31 changes: 31 additions & 0 deletions internal/proxy/responsewriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package proxy

import (
"bytes"
"net/http"
)

type ReponseWriter struct {
body *bytes.Buffer
header http.Header
statusCode int
}

func (p *ReponseWriter) Header() http.Header {
return p.header
}

func (p *ReponseWriter) Write(b []byte) (int, error) {
return p.body.Write(b)
}

func (p *ReponseWriter) WriteHeader(statusCode int) {
p.statusCode = statusCode
}

func NewResponseWriter() *ReponseWriter {
return &ReponseWriter{
header: http.Header{},
body: &bytes.Buffer{},
}
}
Loading

0 comments on commit bda102e

Please sign in to comment.