Skip to content

Commit

Permalink
node: add status code support to gRPC metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
bas-vk committed Jul 18, 2024
1 parent 3ffa5d8 commit 69db95b
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 41 deletions.
16 changes: 1 addition & 15 deletions core/node/rpc/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@ import (
"log/slog"
"net/http"

"github.com/prometheus/client_golang/prometheus"

. "github.com/river-build/river/core/node/base"
"github.com/river-build/river/core/node/dlog"
"github.com/river-build/river/core/node/infra"
)

const (
Expand All @@ -18,22 +15,14 @@ const (
type httpHandler struct {
base http.Handler
log *slog.Logger

counter *prometheus.CounterVec
}

var _ http.Handler = (*httpHandler)(nil)

func newHttpHandler(b http.Handler, l *slog.Logger, m infra.MetricsFactory) *httpHandler {
func newHttpHandler(b http.Handler, l *slog.Logger) *httpHandler {
return &httpHandler{
base: b,
log: l,
counter: m.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests",
},
[]string{"method", "path", "protocol", "status"},
),
}
}

Expand Down Expand Up @@ -62,7 +51,4 @@ func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Add(RequestIdHeader, id)

h.base.ServeHTTP(w, r)

// TODO: implement status reporting here
h.counter.WithLabelValues(r.Method, r.URL.Path, r.Proto, "TODO").Inc()
}
72 changes: 58 additions & 14 deletions core/node/rpc/metrics_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package rpc

import (
"context"
"errors"
"strings"

"connectrpc.com/connect"
"github.com/prometheus/client_golang/prometheus"
"github.com/river-build/river/core/node/base"
"github.com/river-build/river/core/node/infra"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

Expand All @@ -13,18 +17,27 @@ type streamIdProvider interface {
}

type metricsInterceptor struct {
rpcDuration *prometheus.HistogramVec
unaryInflight *prometheus.GaugeVec
openClientStreams *prometheus.GaugeVec
openServerStreams *prometheus.GaugeVec
rpcDuration *prometheus.HistogramVec
unaryInflight *prometheus.GaugeVec
unaryStatusCode *prometheus.GaugeVec
openClientStreams *prometheus.GaugeVec
openServerStreams *prometheus.GaugeVec
serverStreamsStatusCode *prometheus.GaugeVec
}

func (s *Service) NewMetricsInterceptor() connect.Interceptor {
return &metricsInterceptor{
rpcDuration: s.rpcDuration,
unaryInflight: s.metrics.NewGaugeVecEx("grpc_unary_inflight", "gRPC unary calls in flight", "proc"),
openClientStreams: s.metrics.NewGaugeVecEx("grpc_open_client_streams", "gRPC open client streams", "proc"),
openServerStreams: s.metrics.NewGaugeVecEx("grpc_open_server_streams", "gRPC open server streams", "proc"),
rpcDuration: s.metrics.NewHistogramVecEx(
"rpc_duration_seconds",
"RPC duration in seconds",
infra.DefaultDurationBucketsSeconds,
"method",
),
unaryInflight: s.metrics.NewGaugeVecEx("grpc_unary_inflight", "gRPC unary calls in flight", "method"),
unaryStatusCode: s.metrics.NewGaugeVecEx("grpc_unary_status_code", "gRPC unary status code", "method", "status"),
openClientStreams: s.metrics.NewGaugeVecEx("grpc_open_client_streams", "gRPC open client streams", "method"),
openServerStreams: s.metrics.NewGaugeVecEx("grpc_open_server_streams", "gRPC open server streams", "method"),
serverStreamsStatusCode: s.metrics.NewGaugeVecEx("grpc_server_stream_status_code", "gRPC server stream status code", "method", "status"),
}
}

Expand All @@ -35,10 +48,11 @@ func (i *metricsInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc
) (connect.AnyResponse, error) {
var (
proc = req.Spec().Procedure
m = i.unaryInflight.With(prometheus.Labels{"proc": proc})
m = i.unaryInflight.With(prometheus.Labels{"method": proc})
s, _ = i.unaryStatusCode.CurryWith(prometheus.Labels{"method": proc})
)
m.Inc()

defer func() {
m.Dec()
prometheus.NewTimer(i.rpcDuration.WithLabelValues(proc)).ObserveDuration()
Expand All @@ -51,7 +65,11 @@ func (i *metricsInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc
span.SetTag("streamId", r.GetStreamId())
}

return next(ctx, req)
resp, err := next(ctx, req)

s.With(prometheus.Labels{"status": errorToStatus(err)}).Inc()

return resp, err
}
}

Expand All @@ -60,7 +78,7 @@ func (i *metricsInterceptor) WrapStreamingClient(next connect.StreamingClientFun
ctx context.Context,
spec connect.Spec,
) connect.StreamingClientConn {
m := i.openClientStreams.With(prometheus.Labels{"proc": spec.Procedure})
m := i.openClientStreams.With(prometheus.Labels{"method": spec.Procedure})

m.Inc()
defer m.Dec()
Expand All @@ -74,11 +92,37 @@ func (i *metricsInterceptor) WrapStreamingHandler(next connect.StreamingHandlerF
ctx context.Context,
conn connect.StreamingHandlerConn,
) error {
m := i.openClientStreams.With(prometheus.Labels{"proc": conn.Spec().Procedure})
var (
proc = conn.Spec().Procedure
m = i.openClientStreams.With(prometheus.Labels{"method": proc})
s, _ = i.serverStreamsStatusCode.CurryWith(prometheus.Labels{"method": proc})
)

m.Inc()
defer m.Dec()

return next(ctx, conn)
err := next(ctx, conn)

s.With(prometheus.Labels{"status": errorToStatus(err)}).Inc()

return err
}
}

func errorToStatus(err error) string {
if err == nil {
return "success"
}

var connectErr *connect.Error
if ok := errors.As(err, &connectErr); ok {
return connectErr.Code().String()
}

var riverErr *base.RiverErrorImpl
if ok := errors.As(err, &riverErr); ok {
return strings.ToLower(riverErr.Code.String())
}

return "unknown"
}
10 changes: 2 additions & 8 deletions core/node/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,6 @@ func (s *Service) initInstance(mode string) {
}
s.metrics = infra.NewMetrics("river", subsystem)
s.metrics.StartMetricsServer(s.serverCtx, s.config.Metrics)
s.rpcDuration = s.metrics.NewHistogramVecEx(
"rpc_duration_seconds",
"RPC duration in seconds",
infra.DefaultDurationBucketsSeconds,
"method",
)
}

func (s *Service) initWallet() error {
Expand Down Expand Up @@ -520,10 +514,10 @@ func (s *Service) initHandlers() {
NewTimeoutInterceptor(s.config.Network.RequestTimeout),
)
streamServicePattern, streamServiceHandler := protocolconnect.NewStreamServiceHandler(s, interceptors)
s.mux.Handle(streamServicePattern, newHttpHandler(streamServiceHandler, s.defaultLogger, s.metrics))
s.mux.Handle(streamServicePattern, newHttpHandler(streamServiceHandler, s.defaultLogger))

nodeServicePattern, nodeServiceHandler := protocolconnect.NewNodeToNodeHandler(s, interceptors)
s.mux.Handle(nodeServicePattern, newHttpHandler(nodeServiceHandler, s.defaultLogger, s.metrics))
s.mux.Handle(nodeServicePattern, newHttpHandler(nodeServiceHandler, s.defaultLogger))

s.registerDebugHandlers(s.config.EnableDebugEndpoints)
}
Expand Down
5 changes: 1 addition & 4 deletions core/node/rpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/river-build/river/core/config"
"github.com/river-build/river/core/node/auth"
"github.com/river-build/river/core/node/crypto"
Expand Down Expand Up @@ -72,8 +70,7 @@ type Service struct {
Archiver *Archiver

// Metrics
metrics *infra.Metrics
rpcDuration *prometheus.HistogramVec
metrics *infra.Metrics
}

var (
Expand Down

0 comments on commit 69db95b

Please sign in to comment.