diff --git a/.golangci.yml b/.golangci.yml index b0ac6bc..4847901 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -77,6 +77,7 @@ linters-settings: - github.com/purini-to/zapmw - github.com/caitlinelfring/go-env-default - github.com/go-http-utils/headers + - github.com/carlmjohnson/flowmatic issues: max-same-issues: 0 # unlimited diff --git a/cmd/rpcgateway/main.go b/cmd/rpcgateway/main.go index e729717..058a7f4 100644 --- a/cmd/rpcgateway/main.go +++ b/cmd/rpcgateway/main.go @@ -3,19 +3,18 @@ package main import ( "context" "flag" - "fmt" "os" "os/signal" "syscall" - "github.com/0xProject/rpc-gateway/internal/metrics" "github.com/0xProject/rpc-gateway/internal/rpcgateway" + "github.com/carlmjohnson/flowmatic" + "github.com/pkg/errors" "go.uber.org/zap" - "golang.org/x/sync/errgroup" ) func main() { - topCtx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + c, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() debugLogEnabled := os.Getenv("DEBUG") == "true" @@ -26,9 +25,11 @@ func main() { zapConfig := zap.NewProductionConfig() zapConfig.Level = zap.NewAtomicLevelAt(logLevel) logger, _ := zapConfig.Build() + // We replace the global logger with this initialized here for simplyfication. // Do see: https://github.com/uber-go/zap/blob/master/FAQ.md#why-include-package-global-loggers // ref: https://pkg.go.dev/go.uber.org/zap?utm_source=godoc#ReplaceGlobals + // zap.ReplaceGlobals(logger) defer func() { err := logger.Sync() // flushes buffer, if any @@ -37,8 +38,6 @@ func main() { } }() - g, gCtx := errgroup.WithContext(topCtx) - // Initialize config configFileLocation := flag.String("config", "./config.yml", "path to rpc gateway config file") flag.Parse() @@ -47,34 +46,20 @@ func main() { logger.Fatal("failed to get config", zap.Error(err)) } - // start gateway - rpcGateway := rpcgateway.NewRPCGateway(*config) + service := rpcgateway.NewRPCGateway(*config) - // start healthz and metrics server - metricsServer := metrics.NewServer(config.Metrics) - g.Go(func() error { - return metricsServer.Start() - }) + err = flowmatic.Do( + func() error { + return errors.Wrap(service.Start(c), "cannot start a service") + }, + func() error { + <-c.Done() - g.Go(func() error { - return rpcGateway.Start(context.TODO()) - }) + return errors.Wrap(service.Stop(c), "cannot stop a service") + }, + ) - g.Go(func() error { - <-gCtx.Done() - err := metricsServer.Stop() - if err != nil { - logger.Error("error when stopping healthserver", zap.Error(err)) - } - err = rpcGateway.Stop(context.TODO()) - if err != nil { - logger.Error("error when stopping rpc gateway", zap.Error(err)) - } - - return nil - }) - - if err := g.Wait(); err != nil { - fmt.Printf("exit reason: %s \n", err) + if err != nil { + logger.Fatal("errors", zap.Error(err)) } } diff --git a/go.mod b/go.mod index c18992a..7e349c6 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/Shopify/toxiproxy v2.1.4+incompatible github.com/caitlinelfring/go-env-default v1.1.0 + github.com/carlmjohnson/flowmatic v0.23.4 github.com/ethereum/go-ethereum v1.13.12 github.com/go-http-utils/headers v0.0.0-20181008091004-fed159eddc2a github.com/gorilla/mux v1.8.1 @@ -21,6 +22,7 @@ require ( require ( github.com/Microsoft/go-winio v0.6.1 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/carlmjohnson/deque v0.23.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/deckarep/golang-set/v2 v2.6.0 // indirect diff --git a/go.sum b/go.sum index ec16a14..ac2f541 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,10 @@ github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPx github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU= github.com/caitlinelfring/go-env-default v1.1.0 h1:bhDfXmUolvcIGfQCX8qevQX8wxC54NGz0aimoUnhvDM= github.com/caitlinelfring/go-env-default v1.1.0/go.mod h1:tESXPr8zFPP/cRy3cwxrHBmjJIf2A1x/o4C9CET2rEk= +github.com/carlmjohnson/deque v0.23.1 h1:X2HOJM9xcglY03deMZ0oZ1V2xtbqYV7dJDnZiSZN4Ak= +github.com/carlmjohnson/deque v0.23.1/go.mod h1:LF5NJjICBrEOPx84pxPL4nCimy5n9NQjxKi5cXkh+8U= +github.com/carlmjohnson/flowmatic v0.23.4 h1:SfK6f+zKUlw4aga1ph+7/csqVeUAWnBxfqKN5gvQzzs= +github.com/carlmjohnson/flowmatic v0.23.4/go.mod h1:Jpvyl591Dvkt9chYpnVupjxlKvqkZ9CtCmqL4wfQD7U= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/YjhQ= diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 1027817..4fa68e5 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -6,7 +6,6 @@ import ( "time" "github.com/prometheus/client_golang/prometheus/promhttp" - "go.uber.org/zap" ) type Server struct { @@ -14,8 +13,6 @@ type Server struct { } func (s *Server) Start() error { - zap.L().Info("metrics server starting", zap.String("listenAddr", s.server.Addr)) - return s.server.ListenAndServe() } @@ -27,7 +24,7 @@ func NewServer(config Config) *Server { mux := http.NewServeMux() mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { - fmt.Fprintf(w, "{\"healthy\":true}") + w.WriteHeader(http.StatusOK) }) mux.Handle("/metrics", promhttp.Handler()) @@ -35,9 +32,9 @@ func NewServer(config Config) *Server { server: &http.Server{ Handler: mux, Addr: fmt.Sprintf(":%d", config.Port), - WriteTimeout: 15 * time.Second, - ReadTimeout: 15 * time.Second, - ReadHeaderTimeout: 5 * time.Second, + WriteTimeout: time.Second * 15, + ReadTimeout: time.Second * 15, + ReadHeaderTimeout: time.Second * 5, }, } } diff --git a/internal/rpcgateway/rpcgateway.go b/internal/rpcgateway/rpcgateway.go index 048909b..2542bca 100644 --- a/internal/rpcgateway/rpcgateway.go +++ b/internal/rpcgateway/rpcgateway.go @@ -7,10 +7,13 @@ import ( "os" "time" + "github.com/0xProject/rpc-gateway/internal/metrics" "github.com/0xProject/rpc-gateway/internal/proxy" + "github.com/carlmjohnson/flowmatic" "github.com/gorilla/mux" + "github.com/pkg/errors" "github.com/purini-to/zapmw" - metrics "github.com/slok/go-http-metrics/metrics/prometheus" + prometheusMetrics "github.com/slok/go-http-metrics/metrics/prometheus" "github.com/slok/go-http-metrics/middleware" "github.com/slok/go-http-metrics/middleware/std" "go.uber.org/zap" @@ -19,10 +22,11 @@ import ( ) type RPCGateway struct { - config RPCGatewayConfig - proxy *proxy.Proxy - hcm *proxy.HealthCheckManager - server *http.Server + config RPCGatewayConfig + proxy *proxy.Proxy + hcm *proxy.HealthCheckManager + server *http.Server + metrics *metrics.Server } func (r *RPCGateway) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -30,28 +34,31 @@ func (r *RPCGateway) ServeHTTP(w http.ResponseWriter, req *http.Request) { } func (r *RPCGateway) Start(c context.Context) error { - zap.L().Info("starting rpc gateway") - - go func() { - zap.L().Info("starting healthcheck manager") - err := r.hcm.Start(c) - if err != nil { - // TODO: Handle gracefully - zap.L().Fatal("failed to start healthcheck manager", zap.Error(err)) - } - }() - - return r.server.ListenAndServe() + return flowmatic.Do( + func() error { + return errors.Wrap(r.hcm.Start(c), "failed to start health check manager") + }, + func() error { + return errors.Wrap(r.server.ListenAndServe(), "failed to start rpc-gateway") + }, + func() error { + return errors.Wrap(r.metrics.Start(), "failed to start metrics server") + }, + ) } func (r *RPCGateway) Stop(c context.Context) error { - zap.L().Info("stopping rpc gateway") - err := r.hcm.Stop(c) - if err != nil { - zap.L().Error("healthcheck manager failed to stop gracefully", zap.Error(err)) - } - - return r.server.Close() + return flowmatic.Do( + func() error { + return errors.Wrap(r.hcm.Stop(c), "failed to stop health check manager") + }, + func() error { + return errors.Wrap(r.server.Close(), "failed to stop rpc-gateway") + }, + func() error { + return errors.Wrap(r.metrics.Stop(), "failed to stop metrics server") + }, + ) } func NewRPCGateway(config RPCGatewayConfig) *RPCGateway { @@ -73,7 +80,7 @@ func NewRPCGateway(config RPCGatewayConfig) *RPCGateway { r.Use(std.HandlerProvider("", middleware.New(middleware.Config{ - Recorder: metrics.NewRecorder(metrics.Config{}), + Recorder: prometheusMetrics.NewRecorder(prometheusMetrics.Config{}), })), ) @@ -88,6 +95,11 @@ func NewRPCGateway(config RPCGatewayConfig) *RPCGateway { config: config, proxy: proxy, hcm: hcm, + metrics: metrics.NewServer( + metrics.Config{ + Port: config.Metrics.Port, + }, + ), server: &http.Server{ Addr: fmt.Sprintf(":%s", config.Proxy.Port), Handler: r,