From 10773a498480165ca8d3583fa77579ec93c81280 Mon Sep 17 00:00:00 2001 From: Marcin Wolny Date: Fri, 16 Feb 2024 16:29:03 +0100 Subject: [PATCH] Stop use interface and more accurate naming There's no reason for this code to use interface for a health check probes. What's more, returning an interface is wrong. We should pass interface, do not return it. --- .golangci.yml | 19 ++++- cmd/rpcgateway/main.go | 1 + internal/metrics/metrics.go | 1 + internal/proxy/healthchecker.go | 75 ++++++++----------- internal/proxy/healthchecker_test.go | 4 +- internal/proxy/healthchecker_utils.go | 1 + .../{manager.go => healthcheckmanager.go} | 59 ++++++++------- internal/proxy/proxy.go | 4 +- internal/proxy/proxy_test.go | 8 +- internal/rpcgateway/rpcgateway.go | 7 +- 10 files changed, 95 insertions(+), 84 deletions(-) rename internal/proxy/{manager.go => healthcheckmanager.go} (57%) diff --git a/.golangci.yml b/.golangci.yml index e7385e3..b0ac6bc 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -3,14 +3,22 @@ run: linters: enable: + - asciicheck + - bidichk - bodyclose + - containedctx - depguard - dogsled - dupl + - durationcheck - errcheck + - errname - errorlint + - exhaustive - exportloopref + - forcetypeassert - funlen + - gochecknoglobals - gochecknoinits - gocognit - goconst @@ -25,21 +33,30 @@ linters: - gosimple - govet - ineffassign + - ireturn + - maintidx + - makezero - misspell - nakedret - nestif + - nilerr + - nilnil + - nlreturn - prealloc + - predeclared - revive - rowserrcheck - staticcheck - stylecheck + - tagliatelle + - tenv + - thelper - tparallel - typecheck - unconvert - unparam - unused - whitespace - - gochecknoglobals linters-settings: funlen: diff --git a/cmd/rpcgateway/main.go b/cmd/rpcgateway/main.go index 60d8d77..e729717 100644 --- a/cmd/rpcgateway/main.go +++ b/cmd/rpcgateway/main.go @@ -70,6 +70,7 @@ func main() { if err != nil { logger.Error("error when stopping rpc gateway", zap.Error(err)) } + return nil }) diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 4378a40..1027817 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -15,6 +15,7 @@ type Server struct { func (s *Server) Start() error { zap.L().Info("metrics server starting", zap.String("listenAddr", s.server.Addr)) + return s.server.ListenAndServe() } diff --git a/internal/proxy/healthchecker.go b/internal/proxy/healthchecker.go index bb056cf..b24ef23 100644 --- a/internal/proxy/healthchecker.go +++ b/internal/proxy/healthchecker.go @@ -15,19 +15,7 @@ const ( userAgent = "rpc-gateway-health-check" ) -type Healthchecker interface { - Start(ctx context.Context) - Stop(ctx context.Context) error - IsHealthy() bool - BlockNumber() uint64 - GasLimit() uint64 - Taint() - RemoveTaint() - IsTainted() bool - Name() string -} - -type RPCHealthcheckerConfig struct { +type HealthCheckerConfig struct { URL string Name string // identifier imported from RPC gateway config @@ -53,10 +41,10 @@ const ( resetTaintWaitTimeAfterDuration = time.Minute * 5 ) -type RPCHealthchecker struct { +type HealthChecker struct { client *rpc.Client httpClient *http.Client - config RPCHealthcheckerConfig + config HealthCheckerConfig // latest known blockNumber from the RPC. blockNumber uint64 @@ -75,12 +63,10 @@ type RPCHealthchecker struct { // is the ethereum RPC node healthy according to the RPCHealthchecker isHealthy bool - // health check ticker - ticker *time.Ticker - mu sync.RWMutex + mu sync.RWMutex } -func NewHealthchecker(config RPCHealthcheckerConfig) (Healthchecker, error) { +func NewHealthChecker(config HealthCheckerConfig) (*HealthChecker, error) { client, err := rpc.Dial(config.URL) if err != nil { return nil, err @@ -88,7 +74,7 @@ func NewHealthchecker(config RPCHealthcheckerConfig) (Healthchecker, error) { client.SetHeader("User-Agent", userAgent) - healthchecker := &RPCHealthchecker{ + healthchecker := &HealthChecker{ client: client, httpClient: &http.Client{}, config: config, @@ -99,18 +85,19 @@ func NewHealthchecker(config RPCHealthcheckerConfig) (Healthchecker, error) { return healthchecker, nil } -func (h *RPCHealthchecker) Name() string { +func (h *HealthChecker) Name() string { return h.config.Name } -func (h *RPCHealthchecker) checkBlockNumber(ctx context.Context) (uint64, error) { +func (h *HealthChecker) checkBlockNumber(c context.Context) (uint64, error) { // First we check the block number reported by the node. This is later // used to evaluate a single RPC node against others var blockNumber hexutil.Uint64 - err := h.client.CallContext(ctx, &blockNumber, "eth_blockNumber") + err := h.client.CallContext(c, &blockNumber, "eth_blockNumber") if err != nil { zap.L().Warn("error fetching the block number", zap.Error(err), zap.String("name", h.config.Name)) + return 0, err } zap.L().Debug("fetched block", zap.Uint64("blockNumber", uint64(blockNumber)), zap.String("rpcProvider", h.config.Name)) @@ -122,11 +109,12 @@ func (h *RPCHealthchecker) checkBlockNumber(ctx context.Context) (uint64, error) // want to perform an eth_call to make sure eth_call requests are also succeding // as blockNumber can be either cached or routed to a different service on the // RPC provider's side. -func (h *RPCHealthchecker) checkGasLimit(ctx context.Context) (uint64, error) { - gasLimit, err := performGasLeftCall(ctx, h.httpClient, h.config.URL) +func (h *HealthChecker) checkGasLimit(c context.Context) (uint64, error) { + gasLimit, err := performGasLeftCall(c, h.httpClient, h.config.URL) zap.L().Debug("fetched gas limit", zap.Uint64("gasLimit", gasLimit), zap.String("rpcProvider", h.config.Name)) if err != nil { zap.L().Warn("failed fetching the gas limit", zap.Error(err), zap.String("rpcProvider", h.config.Name)) + return gasLimit, err } @@ -137,13 +125,13 @@ func (h *RPCHealthchecker) checkGasLimit(ctx context.Context) (uint64, error) { // - `eth_blockNumber` - to get the latest block reported by the node // - `eth_call` - to get the gas limit // And sets the health status based on the responses. -func (h *RPCHealthchecker) CheckAndSetHealth() { +func (h *HealthChecker) CheckAndSetHealth() { go h.checkAndSetBlockNumberHealth() go h.checkAndSetGasLeftHealth() } -func (h *RPCHealthchecker) checkAndSetBlockNumberHealth() { - ctx, cancel := context.WithTimeout(context.Background(), h.config.Timeout) +func (h *HealthChecker) checkAndSetBlockNumberHealth() { + c, cancel := context.WithTimeout(context.Background(), h.config.Timeout) defer cancel() // TODO @@ -151,7 +139,7 @@ func (h *RPCHealthchecker) checkAndSetBlockNumberHealth() { // This should be moved to a different place, because it does not do a // health checking but it provides additional context. - blockNumber, err := h.checkBlockNumber(ctx) + blockNumber, err := h.checkBlockNumber(c) if err != nil { return } @@ -161,30 +149,31 @@ func (h *RPCHealthchecker) checkAndSetBlockNumberHealth() { h.blockNumber = blockNumber } -func (h *RPCHealthchecker) checkAndSetGasLeftHealth() { - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, h.config.Timeout) +func (h *HealthChecker) checkAndSetGasLeftHealth() { + c, cancel := context.WithTimeout(context.Background(), h.config.Timeout) defer cancel() - gasLimit, err := h.checkGasLimit(ctx) + gasLimit, err := h.checkGasLimit(c) h.mu.Lock() defer h.mu.Unlock() if err != nil { h.isHealthy = false + return } h.gasLimit = gasLimit h.isHealthy = true } -func (h *RPCHealthchecker) Start(ctx context.Context) { +func (h *HealthChecker) Start(c context.Context) { h.CheckAndSetHealth() + ticker := time.NewTicker(h.config.Interval) defer ticker.Stop() - h.ticker = ticker + for { select { - case <-ctx.Done(): + case <-c.Done(): return case <-ticker.C: h.CheckAndSetHealth() @@ -192,12 +181,12 @@ func (h *RPCHealthchecker) Start(ctx context.Context) { } } -func (h *RPCHealthchecker) Stop(_ context.Context) error { +func (h *HealthChecker) Stop(_ context.Context) error { // TODO: Additional cleanups? return nil } -func (h *RPCHealthchecker) IsHealthy() bool { +func (h *HealthChecker) IsHealthy() bool { h.mu.RLock() defer h.mu.RUnlock() @@ -209,28 +198,28 @@ func (h *RPCHealthchecker) IsHealthy() bool { return h.isHealthy } -func (h *RPCHealthchecker) BlockNumber() uint64 { +func (h *HealthChecker) BlockNumber() uint64 { h.mu.RLock() defer h.mu.RUnlock() return h.blockNumber } -func (h *RPCHealthchecker) GasLimit() uint64 { +func (h *HealthChecker) GasLimit() uint64 { h.mu.RLock() defer h.mu.RUnlock() return h.gasLimit } -func (h *RPCHealthchecker) IsTainted() bool { +func (h *HealthChecker) IsTainted() bool { h.mu.RLock() defer h.mu.RUnlock() return h.isTainted } -func (h *RPCHealthchecker) Taint() { +func (h *HealthChecker) Taint() { h.mu.Lock() defer h.mu.Unlock() @@ -255,7 +244,7 @@ func (h *RPCHealthchecker) Taint() { }() } -func (h *RPCHealthchecker) RemoveTaint() { +func (h *HealthChecker) RemoveTaint() { h.mu.Lock() defer h.mu.Unlock() diff --git a/internal/proxy/healthchecker_test.go b/internal/proxy/healthchecker_test.go index bf798c2..42ec390 100644 --- a/internal/proxy/healthchecker_test.go +++ b/internal/proxy/healthchecker_test.go @@ -22,7 +22,7 @@ func TestBasicHealthchecker(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - healtcheckConfig := RPCHealthcheckerConfig{ + healtcheckConfig := HealthCheckerConfig{ URL: env.GetDefault("RPC_GATEWAY_NODE_URL_1", "https://cloudflare-eth.com"), Interval: 1 * time.Second, Timeout: 2 * time.Second, @@ -30,7 +30,7 @@ func TestBasicHealthchecker(t *testing.T) { SuccessThreshold: 1, } - healthchecker, err := NewHealthchecker(healtcheckConfig) + healthchecker, err := NewHealthChecker(healtcheckConfig) assert.NoError(t, err) healthchecker.Start(ctx) diff --git a/internal/proxy/healthchecker_utils.go b/internal/proxy/healthchecker_utils.go index af55e67..2da9d7a 100644 --- a/internal/proxy/healthchecker_utils.go +++ b/internal/proxy/healthchecker_utils.go @@ -20,6 +20,7 @@ type JSONRPCResponse struct { func hexToUint(hexString string) (uint64, error) { hexString = strings.ReplaceAll(hexString, "0x", "") + return strconv.ParseUint(hexString, 16, 64) } diff --git a/internal/proxy/manager.go b/internal/proxy/healthcheckmanager.go similarity index 57% rename from internal/proxy/manager.go rename to internal/proxy/healthcheckmanager.go index ed09485..fb4d83b 100644 --- a/internal/proxy/manager.go +++ b/internal/proxy/healthcheckmanager.go @@ -10,13 +10,13 @@ import ( "go.uber.org/zap" ) -type HealthcheckManagerConfig struct { +type HealthCheckManagerConfig struct { Targets []NodeProviderConfig Config HealthCheckConfig } -type HealthcheckManager struct { - healthcheckers []Healthchecker +type HealthCheckManager struct { + hcs []*HealthChecker metricRPCProviderInfo *prometheus.GaugeVec metricRPCProviderStatus *prometheus.GaugeVec @@ -24,10 +24,10 @@ type HealthcheckManager struct { metricRPCProviderGasLimit *prometheus.GaugeVec } -func NewHealthcheckManager(config HealthcheckManagerConfig) *HealthcheckManager { - healthCheckers := []Healthchecker{} +func NewHealthCheckManager(config HealthCheckManagerConfig) *HealthCheckManager { + hcs := []*HealthChecker{} - healthcheckManager := &HealthcheckManager{ + hcm := &HealthCheckManager{ metricRPCProviderInfo: promauto.NewGaugeVec( prometheus.GaugeOpts{ Name: "zeroex_rpc_gateway_provider_info", @@ -61,8 +61,8 @@ func NewHealthcheckManager(config HealthcheckManagerConfig) *HealthcheckManager } for _, target := range config.Targets { - healthchecker, err := NewHealthchecker( - RPCHealthcheckerConfig{ + hc, err := NewHealthChecker( + HealthCheckerConfig{ URL: target.Connection.HTTP.URL, Name: target.Name, Interval: config.Config.Interval, @@ -75,20 +75,21 @@ func NewHealthcheckManager(config HealthcheckManagerConfig) *HealthcheckManager panic(err) } - healthCheckers = append(healthCheckers, healthchecker) + hcs = append(hcs, hc) } - healthcheckManager.healthcheckers = healthCheckers + hcm.hcs = hcs - return healthcheckManager + return hcm } -func (h *HealthcheckManager) runLoop(ctx context.Context) error { +func (h *HealthCheckManager) runLoop(c context.Context) error { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() + for { select { - case <-ctx.Done(): + case <-c.Done(): return nil case <-ticker.C: h.reportStatusMetrics() @@ -96,35 +97,35 @@ func (h *HealthcheckManager) runLoop(ctx context.Context) error { } } -func (h *HealthcheckManager) reportStatusMetrics() { - for _, healthchecker := range h.healthcheckers { +func (h *HealthCheckManager) reportStatusMetrics() { + for _, hc := range h.hcs { healthy := 0 tainted := 0 - if healthchecker.IsHealthy() { + if hc.IsHealthy() { healthy = 1 } - if healthchecker.IsTainted() { + if hc.IsTainted() { tainted = 1 } - h.metricRPCProviderGasLimit.WithLabelValues(healthchecker.Name()).Set(float64(healthchecker.BlockNumber())) - h.metricRPCProviderBlockNumber.WithLabelValues(healthchecker.Name()).Set(float64(healthchecker.BlockNumber())) - h.metricRPCProviderStatus.WithLabelValues(healthchecker.Name(), "healthy").Set(float64(healthy)) - h.metricRPCProviderStatus.WithLabelValues(healthchecker.Name(), "tainted").Set(float64(tainted)) + h.metricRPCProviderGasLimit.WithLabelValues(hc.Name()).Set(float64(hc.BlockNumber())) + h.metricRPCProviderBlockNumber.WithLabelValues(hc.Name()).Set(float64(hc.BlockNumber())) + h.metricRPCProviderStatus.WithLabelValues(hc.Name(), "healthy").Set(float64(healthy)) + h.metricRPCProviderStatus.WithLabelValues(hc.Name(), "tainted").Set(float64(tainted)) } } -func (h *HealthcheckManager) Start(ctx context.Context) error { - for index, healthChecker := range h.healthcheckers { - h.metricRPCProviderInfo.WithLabelValues(strconv.Itoa(index), healthChecker.Name()).Set(1) - go healthChecker.Start(ctx) +func (h *HealthCheckManager) Start(c context.Context) error { + for i, hc := range h.hcs { + h.metricRPCProviderInfo.WithLabelValues(strconv.Itoa(i), hc.Name()).Set(1) + go hc.Start(c) } - return h.runLoop(ctx) + return h.runLoop(c) } -func (h *HealthcheckManager) Stop(ctx context.Context) error { - for _, healthChecker := range h.healthcheckers { - err := healthChecker.Stop(ctx) +func (h *HealthCheckManager) Stop(c context.Context) error { + for _, hc := range h.hcs { + err := hc.Stop(c) if err != nil { zap.L().Error("healtchecker stop error", zap.Error(err)) } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 0f6ea69..5e90d7a 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -14,14 +14,14 @@ import ( type Proxy struct { config Config targets []*NodeProvider - healthcheckManager *HealthcheckManager + healthcheckManager *HealthCheckManager metricResponseTime *prometheus.HistogramVec metricRequestErrors *prometheus.CounterVec metricResponseStatus *prometheus.CounterVec } -func NewProxy(proxyConfig Config, healthCheckManager *HealthcheckManager) *Proxy { +func NewProxy(proxyConfig Config, healthCheckManager *HealthCheckManager) *Proxy { proxy := &Proxy{ config: proxyConfig, healthcheckManager: healthCheckManager, diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index e073b01..e2791ad 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -64,7 +64,7 @@ func TestHttpFailoverProxyRerouteRequests(t *testing.T) { }, }, } - healthcheckManager := NewHealthcheckManager(HealthcheckManagerConfig{ + healthcheckManager := NewHealthCheckManager(HealthCheckManagerConfig{ Targets: rpcGatewayConfig.Targets, Config: rpcGatewayConfig.HealthChecks, }) @@ -115,7 +115,7 @@ func TestHttpFailoverProxyDecompressRequest(t *testing.T) { }, } - healthcheckManager := NewHealthcheckManager(HealthcheckManagerConfig{ + healthcheckManager := NewHealthCheckManager(HealthCheckManagerConfig{ Targets: rpcGatewayConfig.Targets, Config: rpcGatewayConfig.HealthChecks, }) @@ -169,7 +169,7 @@ func TestHttpFailoverProxyWithCompressionSupportedTarget(t *testing.T) { }, } - healthcheckManager := NewHealthcheckManager(HealthcheckManagerConfig{ + healthcheckManager := NewHealthCheckManager(HealthCheckManagerConfig{ Targets: rpcGatewayConfig.Targets, Config: rpcGatewayConfig.HealthChecks, }) @@ -233,7 +233,7 @@ func TestHTTPFailoverProxyWhenCannotConnectToPrimaryProvider(t *testing.T) { }, }, } - healthcheckManager := NewHealthcheckManager(HealthcheckManagerConfig{ + healthcheckManager := NewHealthCheckManager(HealthCheckManagerConfig{ Targets: rpcGatewayConfig.Targets, Config: rpcGatewayConfig.HealthChecks, }) diff --git a/internal/rpcgateway/rpcgateway.go b/internal/rpcgateway/rpcgateway.go index f665adb..99ac5a2 100644 --- a/internal/rpcgateway/rpcgateway.go +++ b/internal/rpcgateway/rpcgateway.go @@ -21,7 +21,7 @@ import ( type RPCGateway struct { config RPCGatewayConfig httpFailoverProxy *proxy.Proxy - healthcheckManager *proxy.HealthcheckManager + healthcheckManager *proxy.HealthCheckManager server *http.Server } @@ -52,12 +52,13 @@ func (r *RPCGateway) Stop(ctx context.Context) error { if err != nil { zap.L().Error("healthcheck manager failed to stop gracefully", zap.Error(err)) } + return r.server.Close() } func NewRPCGateway(config RPCGatewayConfig) *RPCGateway { - healthcheckManager := proxy.NewHealthcheckManager( - proxy.HealthcheckManagerConfig{ + healthcheckManager := proxy.NewHealthCheckManager( + proxy.HealthCheckManagerConfig{ Targets: config.Targets, Config: config.HealthChecks, })