Skip to content
This repository has been archived by the owner on Jul 22, 2024. It is now read-only.

Commit

Permalink
flowmatic to the clean up!
Browse files Browse the repository at this point in the history
  • Loading branch information
eitu5ami committed Feb 19, 2024
1 parent 6a154fe commit 779045e
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 64 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ linters-settings:
- github.com/purini-to/zapmw
- github.com/caitlinelfring/go-env-default
- github.com/go-http-utils/headers
- github.com/carlmjohnson/flowmatic

issues:
max-same-issues: 0 # unlimited
Expand Down
49 changes: 17 additions & 32 deletions cmd/rpcgateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@ package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/0xProject/rpc-gateway/internal/metrics"
"github.com/0xProject/rpc-gateway/internal/rpcgateway"
"github.com/carlmjohnson/flowmatic"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

func main() {
topCtx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
c, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

debugLogEnabled := os.Getenv("DEBUG") == "true"
Expand All @@ -26,9 +25,11 @@ func main() {
zapConfig := zap.NewProductionConfig()
zapConfig.Level = zap.NewAtomicLevelAt(logLevel)
logger, _ := zapConfig.Build()

// We replace the global logger with this initialized here for simplyfication.
// Do see: https://github.com/uber-go/zap/blob/master/FAQ.md#why-include-package-global-loggers
// ref: https://pkg.go.dev/go.uber.org/zap?utm_source=godoc#ReplaceGlobals
//
zap.ReplaceGlobals(logger)
defer func() {
err := logger.Sync() // flushes buffer, if any
Expand All @@ -37,8 +38,6 @@ func main() {
}
}()

g, gCtx := errgroup.WithContext(topCtx)

// Initialize config
configFileLocation := flag.String("config", "./config.yml", "path to rpc gateway config file")
flag.Parse()
Expand All @@ -47,34 +46,20 @@ func main() {
logger.Fatal("failed to get config", zap.Error(err))
}

// start gateway
rpcGateway := rpcgateway.NewRPCGateway(*config)
service := rpcgateway.NewRPCGateway(*config)

// start healthz and metrics server
metricsServer := metrics.NewServer(config.Metrics)
g.Go(func() error {
return metricsServer.Start()
})
err = flowmatic.Do(
func() error {
return errors.Wrap(service.Start(c), "cannot start a service")
},
func() error {
<-c.Done()

g.Go(func() error {
return rpcGateway.Start(context.TODO())
})
return errors.Wrap(service.Stop(c), "cannot stop a service")
},
)

g.Go(func() error {
<-gCtx.Done()
err := metricsServer.Stop()
if err != nil {
logger.Error("error when stopping healthserver", zap.Error(err))
}
err = rpcGateway.Stop(context.TODO())
if err != nil {
logger.Error("error when stopping rpc gateway", zap.Error(err))
}

return nil
})

if err := g.Wait(); err != nil {
fmt.Printf("exit reason: %s \n", err)
if err != nil {
logger.Fatal("errors", zap.Error(err))
}
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21
require (
github.com/Shopify/toxiproxy v2.1.4+incompatible
github.com/caitlinelfring/go-env-default v1.1.0
github.com/carlmjohnson/flowmatic v0.23.4
github.com/ethereum/go-ethereum v1.13.12
github.com/go-http-utils/headers v0.0.0-20181008091004-fed159eddc2a
github.com/gorilla/mux v1.8.1
Expand All @@ -21,6 +22,7 @@ require (
require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/carlmjohnson/deque v0.23.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPx
github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU=
github.com/caitlinelfring/go-env-default v1.1.0 h1:bhDfXmUolvcIGfQCX8qevQX8wxC54NGz0aimoUnhvDM=
github.com/caitlinelfring/go-env-default v1.1.0/go.mod h1:tESXPr8zFPP/cRy3cwxrHBmjJIf2A1x/o4C9CET2rEk=
github.com/carlmjohnson/deque v0.23.1 h1:X2HOJM9xcglY03deMZ0oZ1V2xtbqYV7dJDnZiSZN4Ak=
github.com/carlmjohnson/deque v0.23.1/go.mod h1:LF5NJjICBrEOPx84pxPL4nCimy5n9NQjxKi5cXkh+8U=
github.com/carlmjohnson/flowmatic v0.23.4 h1:SfK6f+zKUlw4aga1ph+7/csqVeUAWnBxfqKN5gvQzzs=
github.com/carlmjohnson/flowmatic v0.23.4/go.mod h1:Jpvyl591Dvkt9chYpnVupjxlKvqkZ9CtCmqL4wfQD7U=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/YjhQ=
Expand Down
11 changes: 4 additions & 7 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
)

type Server struct {
server *http.Server
}

func (s *Server) Start() error {
zap.L().Info("metrics server starting", zap.String("listenAddr", s.server.Addr))

return s.server.ListenAndServe()
}

Expand All @@ -27,17 +24,17 @@ func NewServer(config Config) *Server {
mux := http.NewServeMux()

mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
fmt.Fprintf(w, "{\"healthy\":true}")
w.WriteHeader(http.StatusOK)
})
mux.Handle("/metrics", promhttp.Handler())

return &Server{
server: &http.Server{
Handler: mux,
Addr: fmt.Sprintf(":%d", config.Port),
WriteTimeout: 15 * time.Second,
ReadTimeout: 15 * time.Second,
ReadHeaderTimeout: 5 * time.Second,
WriteTimeout: time.Second * 15,
ReadTimeout: time.Second * 15,
ReadHeaderTimeout: time.Second * 5,
},
}
}
62 changes: 37 additions & 25 deletions internal/rpcgateway/rpcgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"os"
"time"

"github.com/0xProject/rpc-gateway/internal/metrics"
"github.com/0xProject/rpc-gateway/internal/proxy"
"github.com/carlmjohnson/flowmatic"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/purini-to/zapmw"
metrics "github.com/slok/go-http-metrics/metrics/prometheus"
prometheusMetrics "github.com/slok/go-http-metrics/metrics/prometheus"
"github.com/slok/go-http-metrics/middleware"
"github.com/slok/go-http-metrics/middleware/std"
"go.uber.org/zap"
Expand All @@ -19,39 +22,43 @@ import (
)

type RPCGateway struct {
config RPCGatewayConfig
proxy *proxy.Proxy
hcm *proxy.HealthCheckManager
server *http.Server
config RPCGatewayConfig
proxy *proxy.Proxy
hcm *proxy.HealthCheckManager
server *http.Server
metrics *metrics.Server
}

func (r *RPCGateway) ServeHTTP(w http.ResponseWriter, req *http.Request) {
r.server.Handler.ServeHTTP(w, req)
}

func (r *RPCGateway) Start(c context.Context) error {
zap.L().Info("starting rpc gateway")

go func() {
zap.L().Info("starting healthcheck manager")
err := r.hcm.Start(c)
if err != nil {
// TODO: Handle gracefully
zap.L().Fatal("failed to start healthcheck manager", zap.Error(err))
}
}()

return r.server.ListenAndServe()
return flowmatic.Do(
func() error {
return errors.Wrap(r.hcm.Start(c), "failed to start health check manager")
},
func() error {
return errors.Wrap(r.server.ListenAndServe(), "failed to start rpc-gateway")
},
func() error {
return errors.Wrap(r.metrics.Start(), "failed to start metrics server")
},
)
}

func (r *RPCGateway) Stop(c context.Context) error {
zap.L().Info("stopping rpc gateway")
err := r.hcm.Stop(c)
if err != nil {
zap.L().Error("healthcheck manager failed to stop gracefully", zap.Error(err))
}

return r.server.Close()
return flowmatic.Do(
func() error {
return errors.Wrap(r.hcm.Stop(c), "failed to stop health check manager")
},
func() error {
return errors.Wrap(r.server.Close(), "failed to stop rpc-gateway")
},
func() error {
return errors.Wrap(r.metrics.Stop(), "failed to stop metrics server")
},
)
}

func NewRPCGateway(config RPCGatewayConfig) *RPCGateway {
Expand All @@ -73,7 +80,7 @@ func NewRPCGateway(config RPCGatewayConfig) *RPCGateway {

r.Use(std.HandlerProvider("",
middleware.New(middleware.Config{
Recorder: metrics.NewRecorder(metrics.Config{}),
Recorder: prometheusMetrics.NewRecorder(prometheusMetrics.Config{}),
})),
)

Expand All @@ -88,6 +95,11 @@ func NewRPCGateway(config RPCGatewayConfig) *RPCGateway {
config: config,
proxy: proxy,
hcm: hcm,
metrics: metrics.NewServer(
metrics.Config{
Port: config.Metrics.Port,
},
),
server: &http.Server{
Addr: fmt.Sprintf(":%s", config.Proxy.Port),
Handler: r,
Expand Down

0 comments on commit 779045e

Please sign in to comment.