Skip to content
This repository has been archived by the owner on Jul 22, 2024. It is now read-only.

flowmatic to the clean up! #166

Merged
merged 1 commit into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ linters-settings:
- github.com/purini-to/zapmw
- github.com/caitlinelfring/go-env-default
- github.com/go-http-utils/headers
- github.com/carlmjohnson/flowmatic

issues:
max-same-issues: 0 # unlimited
Expand Down
49 changes: 17 additions & 32 deletions cmd/rpcgateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@ package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/0xProject/rpc-gateway/internal/metrics"
"github.com/0xProject/rpc-gateway/internal/rpcgateway"
"github.com/carlmjohnson/flowmatic"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

func main() {
topCtx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
c, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

debugLogEnabled := os.Getenv("DEBUG") == "true"
Expand All @@ -26,9 +25,11 @@ func main() {
zapConfig := zap.NewProductionConfig()
zapConfig.Level = zap.NewAtomicLevelAt(logLevel)
logger, _ := zapConfig.Build()

// We replace the global logger with this initialized here for simplyfication.
// Do see: https://github.com/uber-go/zap/blob/master/FAQ.md#why-include-package-global-loggers
// ref: https://pkg.go.dev/go.uber.org/zap?utm_source=godoc#ReplaceGlobals
//
zap.ReplaceGlobals(logger)
defer func() {
err := logger.Sync() // flushes buffer, if any
Expand All @@ -37,8 +38,6 @@ func main() {
}
}()

g, gCtx := errgroup.WithContext(topCtx)

// Initialize config
configFileLocation := flag.String("config", "./config.yml", "path to rpc gateway config file")
flag.Parse()
Expand All @@ -47,34 +46,20 @@ func main() {
logger.Fatal("failed to get config", zap.Error(err))
}

// start gateway
rpcGateway := rpcgateway.NewRPCGateway(*config)
service := rpcgateway.NewRPCGateway(*config)

// start healthz and metrics server
metricsServer := metrics.NewServer(config.Metrics)
g.Go(func() error {
return metricsServer.Start()
})
err = flowmatic.Do(
func() error {
return errors.Wrap(service.Start(c), "cannot start a service")
},
func() error {
<-c.Done()

g.Go(func() error {
return rpcGateway.Start(context.TODO())
})
return errors.Wrap(service.Stop(c), "cannot stop a service")
},
)

g.Go(func() error {
<-gCtx.Done()
err := metricsServer.Stop()
if err != nil {
logger.Error("error when stopping healthserver", zap.Error(err))
}
err = rpcGateway.Stop(context.TODO())
if err != nil {
logger.Error("error when stopping rpc gateway", zap.Error(err))
}

return nil
})

if err := g.Wait(); err != nil {
fmt.Printf("exit reason: %s \n", err)
if err != nil {
logger.Fatal("errors", zap.Error(err))
}
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21
require (
github.com/Shopify/toxiproxy v2.1.4+incompatible
github.com/caitlinelfring/go-env-default v1.1.0
github.com/carlmjohnson/flowmatic v0.23.4
github.com/ethereum/go-ethereum v1.13.12
github.com/go-http-utils/headers v0.0.0-20181008091004-fed159eddc2a
github.com/gorilla/mux v1.8.1
Expand All @@ -21,6 +22,7 @@ require (
require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/carlmjohnson/deque v0.23.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPx
github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU=
github.com/caitlinelfring/go-env-default v1.1.0 h1:bhDfXmUolvcIGfQCX8qevQX8wxC54NGz0aimoUnhvDM=
github.com/caitlinelfring/go-env-default v1.1.0/go.mod h1:tESXPr8zFPP/cRy3cwxrHBmjJIf2A1x/o4C9CET2rEk=
github.com/carlmjohnson/deque v0.23.1 h1:X2HOJM9xcglY03deMZ0oZ1V2xtbqYV7dJDnZiSZN4Ak=
github.com/carlmjohnson/deque v0.23.1/go.mod h1:LF5NJjICBrEOPx84pxPL4nCimy5n9NQjxKi5cXkh+8U=
github.com/carlmjohnson/flowmatic v0.23.4 h1:SfK6f+zKUlw4aga1ph+7/csqVeUAWnBxfqKN5gvQzzs=
github.com/carlmjohnson/flowmatic v0.23.4/go.mod h1:Jpvyl591Dvkt9chYpnVupjxlKvqkZ9CtCmqL4wfQD7U=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/YjhQ=
Expand Down
11 changes: 4 additions & 7 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
)

type Server struct {
server *http.Server
}

func (s *Server) Start() error {
zap.L().Info("metrics server starting", zap.String("listenAddr", s.server.Addr))

return s.server.ListenAndServe()
}

Expand All @@ -27,17 +24,17 @@ func NewServer(config Config) *Server {
mux := http.NewServeMux()

mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
fmt.Fprintf(w, "{\"healthy\":true}")
w.WriteHeader(http.StatusOK)
})
mux.Handle("/metrics", promhttp.Handler())

return &Server{
server: &http.Server{
Handler: mux,
Addr: fmt.Sprintf(":%d", config.Port),
WriteTimeout: 15 * time.Second,
ReadTimeout: 15 * time.Second,
ReadHeaderTimeout: 5 * time.Second,
WriteTimeout: time.Second * 15,
ReadTimeout: time.Second * 15,
ReadHeaderTimeout: time.Second * 5,
},
}
}
62 changes: 37 additions & 25 deletions internal/rpcgateway/rpcgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"os"
"time"

"github.com/0xProject/rpc-gateway/internal/metrics"
"github.com/0xProject/rpc-gateway/internal/proxy"
"github.com/carlmjohnson/flowmatic"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/purini-to/zapmw"
metrics "github.com/slok/go-http-metrics/metrics/prometheus"
prometheusMetrics "github.com/slok/go-http-metrics/metrics/prometheus"
"github.com/slok/go-http-metrics/middleware"
"github.com/slok/go-http-metrics/middleware/std"
"go.uber.org/zap"
Expand All @@ -19,39 +22,43 @@ import (
)

type RPCGateway struct {
config RPCGatewayConfig
proxy *proxy.Proxy
hcm *proxy.HealthCheckManager
server *http.Server
config RPCGatewayConfig
proxy *proxy.Proxy
hcm *proxy.HealthCheckManager
server *http.Server
metrics *metrics.Server
}

func (r *RPCGateway) ServeHTTP(w http.ResponseWriter, req *http.Request) {
r.server.Handler.ServeHTTP(w, req)
}

func (r *RPCGateway) Start(c context.Context) error {
zap.L().Info("starting rpc gateway")

go func() {
zap.L().Info("starting healthcheck manager")
err := r.hcm.Start(c)
if err != nil {
// TODO: Handle gracefully
zap.L().Fatal("failed to start healthcheck manager", zap.Error(err))
}
}()

return r.server.ListenAndServe()
return flowmatic.Do(
func() error {
return errors.Wrap(r.hcm.Start(c), "failed to start health check manager")
},
func() error {
return errors.Wrap(r.server.ListenAndServe(), "failed to start rpc-gateway")
},
func() error {
return errors.Wrap(r.metrics.Start(), "failed to start metrics server")
},
)
}

func (r *RPCGateway) Stop(c context.Context) error {
zap.L().Info("stopping rpc gateway")
err := r.hcm.Stop(c)
if err != nil {
zap.L().Error("healthcheck manager failed to stop gracefully", zap.Error(err))
}

return r.server.Close()
return flowmatic.Do(
func() error {
return errors.Wrap(r.hcm.Stop(c), "failed to stop health check manager")
},
func() error {
return errors.Wrap(r.server.Close(), "failed to stop rpc-gateway")
},
func() error {
return errors.Wrap(r.metrics.Stop(), "failed to stop metrics server")
},
)
}

func NewRPCGateway(config RPCGatewayConfig) *RPCGateway {
Expand All @@ -73,7 +80,7 @@ func NewRPCGateway(config RPCGatewayConfig) *RPCGateway {

r.Use(std.HandlerProvider("",
middleware.New(middleware.Config{
Recorder: metrics.NewRecorder(metrics.Config{}),
Recorder: prometheusMetrics.NewRecorder(prometheusMetrics.Config{}),
})),
)

Expand All @@ -88,6 +95,11 @@ func NewRPCGateway(config RPCGatewayConfig) *RPCGateway {
config: config,
proxy: proxy,
hcm: hcm,
metrics: metrics.NewServer(
metrics.Config{
Port: config.Metrics.Port,
},
),
server: &http.Server{
Addr: fmt.Sprintf(":%s", config.Proxy.Port),
Handler: r,
Expand Down
Loading