From 5364140bf6bd0084d9c25b875b19aa5c0410e97a Mon Sep 17 00:00:00 2001 From: Marcin Wolny Date: Fri, 16 Feb 2024 16:29:03 +0100 Subject: [PATCH] Stop use interface and more accurate naming There's no reason for this code to use interface for a health check probes. What's more, returning an interface is wrong. We should pass interface, do not return it. --- .golangci.yml | 19 +++- cmd/rpcgateway/main.go | 1 + internal/metrics/metrics.go | 1 + internal/proxy/healthchecker.go | 53 +++++----- internal/proxy/healthchecker_test.go | 4 +- internal/proxy/healthchecker_utils.go | 1 + internal/proxy/manager.go | 134 -------------------------- internal/proxy/proxy.go | 4 +- internal/proxy/proxy_test.go | 8 +- internal/rpcgateway/rpcgateway.go | 7 +- 10 files changed, 55 insertions(+), 177 deletions(-) delete mode 100644 internal/proxy/manager.go diff --git a/.golangci.yml b/.golangci.yml index e7385e3..b0ac6bc 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -3,14 +3,22 @@ run: linters: enable: + - asciicheck + - bidichk - bodyclose + - containedctx - depguard - dogsled - dupl + - durationcheck - errcheck + - errname - errorlint + - exhaustive - exportloopref + - forcetypeassert - funlen + - gochecknoglobals - gochecknoinits - gocognit - goconst @@ -25,21 +33,30 @@ linters: - gosimple - govet - ineffassign + - ireturn + - maintidx + - makezero - misspell - nakedret - nestif + - nilerr + - nilnil + - nlreturn - prealloc + - predeclared - revive - rowserrcheck - staticcheck - stylecheck + - tagliatelle + - tenv + - thelper - tparallel - typecheck - unconvert - unparam - unused - whitespace - - gochecknoglobals linters-settings: funlen: diff --git a/cmd/rpcgateway/main.go b/cmd/rpcgateway/main.go index 60d8d77..e729717 100644 --- a/cmd/rpcgateway/main.go +++ b/cmd/rpcgateway/main.go @@ -70,6 +70,7 @@ func main() { if err != nil { logger.Error("error when stopping rpc gateway", zap.Error(err)) } + return nil }) diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 4378a40..1027817 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -15,6 +15,7 @@ type Server struct { func (s *Server) Start() error { zap.L().Info("metrics server starting", zap.String("listenAddr", s.server.Addr)) + return s.server.ListenAndServe() } diff --git a/internal/proxy/healthchecker.go b/internal/proxy/healthchecker.go index bb056cf..d1422fc 100644 --- a/internal/proxy/healthchecker.go +++ b/internal/proxy/healthchecker.go @@ -15,19 +15,7 @@ const ( userAgent = "rpc-gateway-health-check" ) -type Healthchecker interface { - Start(ctx context.Context) - Stop(ctx context.Context) error - IsHealthy() bool - BlockNumber() uint64 - GasLimit() uint64 - Taint() - RemoveTaint() - IsTainted() bool - Name() string -} - -type RPCHealthcheckerConfig struct { +type HealthCheckerConfig struct { URL string Name string // identifier imported from RPC gateway config @@ -53,10 +41,10 @@ const ( resetTaintWaitTimeAfterDuration = time.Minute * 5 ) -type RPCHealthchecker struct { +type HealthChecker struct { client *rpc.Client httpClient *http.Client - config RPCHealthcheckerConfig + config HealthCheckerConfig // latest known blockNumber from the RPC. blockNumber uint64 @@ -80,7 +68,7 @@ type RPCHealthchecker struct { mu sync.RWMutex } -func NewHealthchecker(config RPCHealthcheckerConfig) (Healthchecker, error) { +func NewHealthChecker(config HealthCheckerConfig) (*HealthChecker, error) { client, err := rpc.Dial(config.URL) if err != nil { return nil, err @@ -88,7 +76,7 @@ func NewHealthchecker(config RPCHealthcheckerConfig) (Healthchecker, error) { client.SetHeader("User-Agent", userAgent) - healthchecker := &RPCHealthchecker{ + healthchecker := &HealthChecker{ client: client, httpClient: &http.Client{}, config: config, @@ -99,11 +87,11 @@ func NewHealthchecker(config RPCHealthcheckerConfig) (Healthchecker, error) { return healthchecker, nil } -func (h *RPCHealthchecker) Name() string { +func (h *HealthChecker) Name() string { return h.config.Name } -func (h *RPCHealthchecker) checkBlockNumber(ctx context.Context) (uint64, error) { +func (h *HealthChecker) checkBlockNumber(ctx context.Context) (uint64, error) { // First we check the block number reported by the node. This is later // used to evaluate a single RPC node against others var blockNumber hexutil.Uint64 @@ -111,6 +99,7 @@ func (h *RPCHealthchecker) checkBlockNumber(ctx context.Context) (uint64, error) err := h.client.CallContext(ctx, &blockNumber, "eth_blockNumber") if err != nil { zap.L().Warn("error fetching the block number", zap.Error(err), zap.String("name", h.config.Name)) + return 0, err } zap.L().Debug("fetched block", zap.Uint64("blockNumber", uint64(blockNumber)), zap.String("rpcProvider", h.config.Name)) @@ -122,11 +111,12 @@ func (h *RPCHealthchecker) checkBlockNumber(ctx context.Context) (uint64, error) // want to perform an eth_call to make sure eth_call requests are also succeding // as blockNumber can be either cached or routed to a different service on the // RPC provider's side. -func (h *RPCHealthchecker) checkGasLimit(ctx context.Context) (uint64, error) { +func (h *HealthChecker) checkGasLimit(ctx context.Context) (uint64, error) { gasLimit, err := performGasLeftCall(ctx, h.httpClient, h.config.URL) zap.L().Debug("fetched gas limit", zap.Uint64("gasLimit", gasLimit), zap.String("rpcProvider", h.config.Name)) if err != nil { zap.L().Warn("failed fetching the gas limit", zap.Error(err), zap.String("rpcProvider", h.config.Name)) + return gasLimit, err } @@ -137,12 +127,12 @@ func (h *RPCHealthchecker) checkGasLimit(ctx context.Context) (uint64, error) { // - `eth_blockNumber` - to get the latest block reported by the node // - `eth_call` - to get the gas limit // And sets the health status based on the responses. -func (h *RPCHealthchecker) CheckAndSetHealth() { +func (h *HealthChecker) CheckAndSetHealth() { go h.checkAndSetBlockNumberHealth() go h.checkAndSetGasLeftHealth() } -func (h *RPCHealthchecker) checkAndSetBlockNumberHealth() { +func (h *HealthChecker) checkAndSetBlockNumberHealth() { ctx, cancel := context.WithTimeout(context.Background(), h.config.Timeout) defer cancel() @@ -161,7 +151,7 @@ func (h *RPCHealthchecker) checkAndSetBlockNumberHealth() { h.blockNumber = blockNumber } -func (h *RPCHealthchecker) checkAndSetGasLeftHealth() { +func (h *HealthChecker) checkAndSetGasLeftHealth() { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, h.config.Timeout) defer cancel() @@ -171,13 +161,14 @@ func (h *RPCHealthchecker) checkAndSetGasLeftHealth() { defer h.mu.Unlock() if err != nil { h.isHealthy = false + return } h.gasLimit = gasLimit h.isHealthy = true } -func (h *RPCHealthchecker) Start(ctx context.Context) { +func (h *HealthChecker) Start(ctx context.Context) { h.CheckAndSetHealth() ticker := time.NewTicker(h.config.Interval) defer ticker.Stop() @@ -192,12 +183,12 @@ func (h *RPCHealthchecker) Start(ctx context.Context) { } } -func (h *RPCHealthchecker) Stop(_ context.Context) error { +func (h *HealthChecker) Stop(_ context.Context) error { // TODO: Additional cleanups? return nil } -func (h *RPCHealthchecker) IsHealthy() bool { +func (h *HealthChecker) IsHealthy() bool { h.mu.RLock() defer h.mu.RUnlock() @@ -209,28 +200,28 @@ func (h *RPCHealthchecker) IsHealthy() bool { return h.isHealthy } -func (h *RPCHealthchecker) BlockNumber() uint64 { +func (h *HealthChecker) BlockNumber() uint64 { h.mu.RLock() defer h.mu.RUnlock() return h.blockNumber } -func (h *RPCHealthchecker) GasLimit() uint64 { +func (h *HealthChecker) GasLimit() uint64 { h.mu.RLock() defer h.mu.RUnlock() return h.gasLimit } -func (h *RPCHealthchecker) IsTainted() bool { +func (h *HealthChecker) IsTainted() bool { h.mu.RLock() defer h.mu.RUnlock() return h.isTainted } -func (h *RPCHealthchecker) Taint() { +func (h *HealthChecker) Taint() { h.mu.Lock() defer h.mu.Unlock() @@ -255,7 +246,7 @@ func (h *RPCHealthchecker) Taint() { }() } -func (h *RPCHealthchecker) RemoveTaint() { +func (h *HealthChecker) RemoveTaint() { h.mu.Lock() defer h.mu.Unlock() diff --git a/internal/proxy/healthchecker_test.go b/internal/proxy/healthchecker_test.go index bf798c2..42ec390 100644 --- a/internal/proxy/healthchecker_test.go +++ b/internal/proxy/healthchecker_test.go @@ -22,7 +22,7 @@ func TestBasicHealthchecker(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - healtcheckConfig := RPCHealthcheckerConfig{ + healtcheckConfig := HealthCheckerConfig{ URL: env.GetDefault("RPC_GATEWAY_NODE_URL_1", "https://cloudflare-eth.com"), Interval: 1 * time.Second, Timeout: 2 * time.Second, @@ -30,7 +30,7 @@ func TestBasicHealthchecker(t *testing.T) { SuccessThreshold: 1, } - healthchecker, err := NewHealthchecker(healtcheckConfig) + healthchecker, err := NewHealthChecker(healtcheckConfig) assert.NoError(t, err) healthchecker.Start(ctx) diff --git a/internal/proxy/healthchecker_utils.go b/internal/proxy/healthchecker_utils.go index af55e67..2da9d7a 100644 --- a/internal/proxy/healthchecker_utils.go +++ b/internal/proxy/healthchecker_utils.go @@ -20,6 +20,7 @@ type JSONRPCResponse struct { func hexToUint(hexString string) (uint64, error) { hexString = strings.ReplaceAll(hexString, "0x", "") + return strconv.ParseUint(hexString, 16, 64) } diff --git a/internal/proxy/manager.go b/internal/proxy/manager.go deleted file mode 100644 index ed09485..0000000 --- a/internal/proxy/manager.go +++ /dev/null @@ -1,134 +0,0 @@ -package proxy - -import ( - "context" - "strconv" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "go.uber.org/zap" -) - -type HealthcheckManagerConfig struct { - Targets []NodeProviderConfig - Config HealthCheckConfig -} - -type HealthcheckManager struct { - healthcheckers []Healthchecker - - metricRPCProviderInfo *prometheus.GaugeVec - metricRPCProviderStatus *prometheus.GaugeVec - metricRPCProviderBlockNumber *prometheus.GaugeVec - metricRPCProviderGasLimit *prometheus.GaugeVec -} - -func NewHealthcheckManager(config HealthcheckManagerConfig) *HealthcheckManager { - healthCheckers := []Healthchecker{} - - healthcheckManager := &HealthcheckManager{ - metricRPCProviderInfo: promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "zeroex_rpc_gateway_provider_info", - Help: "Gas limit of a given provider", - }, []string{ - "index", - "provider", - }), - metricRPCProviderStatus: promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "zeroex_rpc_gateway_provider_status", - Help: "Current status of a given provider by type. Type can be either healthy or tainted.", - }, []string{ - "provider", - "type", - }), - metricRPCProviderBlockNumber: promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "zeroex_rpc_gateway_provider_block_number", - Help: "Block number of a given provider", - }, []string{ - "provider", - }), - metricRPCProviderGasLimit: promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "zeroex_rpc_gateway_provider_gasLimit_number", - Help: "Gas limit of a given provider", - }, []string{ - "provider", - }), - } - - for _, target := range config.Targets { - healthchecker, err := NewHealthchecker( - RPCHealthcheckerConfig{ - URL: target.Connection.HTTP.URL, - Name: target.Name, - Interval: config.Config.Interval, - Timeout: config.Config.Timeout, - FailureThreshold: config.Config.FailureThreshold, - SuccessThreshold: config.Config.SuccessThreshold, - }) - - if err != nil { - panic(err) - } - - healthCheckers = append(healthCheckers, healthchecker) - } - - healthcheckManager.healthcheckers = healthCheckers - - return healthcheckManager -} - -func (h *HealthcheckManager) runLoop(ctx context.Context) error { - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return nil - case <-ticker.C: - h.reportStatusMetrics() - } - } -} - -func (h *HealthcheckManager) reportStatusMetrics() { - for _, healthchecker := range h.healthcheckers { - healthy := 0 - tainted := 0 - if healthchecker.IsHealthy() { - healthy = 1 - } - if healthchecker.IsTainted() { - tainted = 1 - } - h.metricRPCProviderGasLimit.WithLabelValues(healthchecker.Name()).Set(float64(healthchecker.BlockNumber())) - h.metricRPCProviderBlockNumber.WithLabelValues(healthchecker.Name()).Set(float64(healthchecker.BlockNumber())) - h.metricRPCProviderStatus.WithLabelValues(healthchecker.Name(), "healthy").Set(float64(healthy)) - h.metricRPCProviderStatus.WithLabelValues(healthchecker.Name(), "tainted").Set(float64(tainted)) - } -} - -func (h *HealthcheckManager) Start(ctx context.Context) error { - for index, healthChecker := range h.healthcheckers { - h.metricRPCProviderInfo.WithLabelValues(strconv.Itoa(index), healthChecker.Name()).Set(1) - go healthChecker.Start(ctx) - } - - return h.runLoop(ctx) -} - -func (h *HealthcheckManager) Stop(ctx context.Context) error { - for _, healthChecker := range h.healthcheckers { - err := healthChecker.Stop(ctx) - if err != nil { - zap.L().Error("healtchecker stop error", zap.Error(err)) - } - } - - return nil -} diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 0f6ea69..5e90d7a 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -14,14 +14,14 @@ import ( type Proxy struct { config Config targets []*NodeProvider - healthcheckManager *HealthcheckManager + healthcheckManager *HealthCheckManager metricResponseTime *prometheus.HistogramVec metricRequestErrors *prometheus.CounterVec metricResponseStatus *prometheus.CounterVec } -func NewProxy(proxyConfig Config, healthCheckManager *HealthcheckManager) *Proxy { +func NewProxy(proxyConfig Config, healthCheckManager *HealthCheckManager) *Proxy { proxy := &Proxy{ config: proxyConfig, healthcheckManager: healthCheckManager, diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index e073b01..e2791ad 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -64,7 +64,7 @@ func TestHttpFailoverProxyRerouteRequests(t *testing.T) { }, }, } - healthcheckManager := NewHealthcheckManager(HealthcheckManagerConfig{ + healthcheckManager := NewHealthCheckManager(HealthCheckManagerConfig{ Targets: rpcGatewayConfig.Targets, Config: rpcGatewayConfig.HealthChecks, }) @@ -115,7 +115,7 @@ func TestHttpFailoverProxyDecompressRequest(t *testing.T) { }, } - healthcheckManager := NewHealthcheckManager(HealthcheckManagerConfig{ + healthcheckManager := NewHealthCheckManager(HealthCheckManagerConfig{ Targets: rpcGatewayConfig.Targets, Config: rpcGatewayConfig.HealthChecks, }) @@ -169,7 +169,7 @@ func TestHttpFailoverProxyWithCompressionSupportedTarget(t *testing.T) { }, } - healthcheckManager := NewHealthcheckManager(HealthcheckManagerConfig{ + healthcheckManager := NewHealthCheckManager(HealthCheckManagerConfig{ Targets: rpcGatewayConfig.Targets, Config: rpcGatewayConfig.HealthChecks, }) @@ -233,7 +233,7 @@ func TestHTTPFailoverProxyWhenCannotConnectToPrimaryProvider(t *testing.T) { }, }, } - healthcheckManager := NewHealthcheckManager(HealthcheckManagerConfig{ + healthcheckManager := NewHealthCheckManager(HealthCheckManagerConfig{ Targets: rpcGatewayConfig.Targets, Config: rpcGatewayConfig.HealthChecks, }) diff --git a/internal/rpcgateway/rpcgateway.go b/internal/rpcgateway/rpcgateway.go index f665adb..99ac5a2 100644 --- a/internal/rpcgateway/rpcgateway.go +++ b/internal/rpcgateway/rpcgateway.go @@ -21,7 +21,7 @@ import ( type RPCGateway struct { config RPCGatewayConfig httpFailoverProxy *proxy.Proxy - healthcheckManager *proxy.HealthcheckManager + healthcheckManager *proxy.HealthCheckManager server *http.Server } @@ -52,12 +52,13 @@ func (r *RPCGateway) Stop(ctx context.Context) error { if err != nil { zap.L().Error("healthcheck manager failed to stop gracefully", zap.Error(err)) } + return r.server.Close() } func NewRPCGateway(config RPCGatewayConfig) *RPCGateway { - healthcheckManager := proxy.NewHealthcheckManager( - proxy.HealthcheckManagerConfig{ + healthcheckManager := proxy.NewHealthCheckManager( + proxy.HealthCheckManagerConfig{ Targets: config.Targets, Config: config.HealthChecks, })