Skip to content

Commit

Permalink
node: add status code support to gRPC metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
bas-vk committed Jul 15, 2024
1 parent 1893e10 commit aa16939
Showing 1 changed file with 37 additions and 12 deletions.
49 changes: 37 additions & 12 deletions core/node/rpc/metrics_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rpc

import (
"context"
"fmt"

"connectrpc.com/connect"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -13,18 +14,22 @@ type streamIdProvider interface {
}

type metricsInterceptor struct {
rpcDuration *prometheus.HistogramVec
unaryInflight *prometheus.GaugeVec
openClientStreams *prometheus.GaugeVec
openServerStreams *prometheus.GaugeVec
rpcDuration *prometheus.HistogramVec
unaryInflight *prometheus.GaugeVec
unaryStatusCode *prometheus.GaugeVec
openClientStreams *prometheus.GaugeVec
openServerStreams *prometheus.GaugeVec
serverStreamsStatusCode *prometheus.GaugeVec
}

func (s *Service) NewMetricsInterceptor() connect.Interceptor {
return &metricsInterceptor{
rpcDuration: s.rpcDuration,
unaryInflight: s.metrics.NewGaugeVecEx("grpc_unary_inflight", "gRPC unary calls in flight", "proc"),
openClientStreams: s.metrics.NewGaugeVecEx("grpc_open_client_streams", "gRPC open client streams", "proc"),
openServerStreams: s.metrics.NewGaugeVecEx("grpc_open_server_streams", "gRPC open server streams", "proc"),
rpcDuration: s.rpcDuration,
unaryInflight: s.metrics.NewGaugeVecEx("grpc_unary_inflight", "gRPC unary calls in flight", "proc"),
unaryStatusCode: s.metrics.NewGaugeVecEx("grpc_unary_status_code", "gRPC unary status code", "proc", "status"),
openClientStreams: s.metrics.NewGaugeVecEx("grpc_open_client_streams", "gRPC open client streams", "proc"),
openServerStreams: s.metrics.NewGaugeVecEx("grpc_open_server_streams", "gRPC open server streams", "proc"),
serverStreamsStatusCode: s.metrics.NewGaugeVecEx("grpc_server_stream_status_code", "gRPC server stream status code", "proc", "status"),
}
}

Expand All @@ -36,9 +41,10 @@ func (i *metricsInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc
var (
proc = req.Spec().Procedure
m = i.unaryInflight.With(prometheus.Labels{"proc": proc})
s, _ = i.unaryStatusCode.CurryWith(prometheus.Labels{"proc": proc})
)
m.Inc()

defer func() {
m.Dec()
prometheus.NewTimer(i.rpcDuration.WithLabelValues(proc)).ObserveDuration()
Expand All @@ -51,8 +57,19 @@ func (i *metricsInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc
span.SetTag("streamId", r.GetStreamId())
}

return next(ctx, req)
resp, err := next(ctx, req)

s.With(prometheus.Labels{"status": fmt.Sprintf("%d", statusCode(err))}).Inc()

return resp, err
}
}

func statusCode(err error) uint32 {
if err == nil {
return 0 // grpc uses 0 for success
}
return uint32(connect.CodeOf(err))
}

func (i *metricsInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc {
Expand All @@ -74,11 +91,19 @@ func (i *metricsInterceptor) WrapStreamingHandler(next connect.StreamingHandlerF
ctx context.Context,
conn connect.StreamingHandlerConn,
) error {
m := i.openClientStreams.With(prometheus.Labels{"proc": conn.Spec().Procedure})
var (
proc = conn.Spec().Procedure
m = i.openClientStreams.With(prometheus.Labels{"proc": proc})
s, _ = i.serverStreamsStatusCode.CurryWith(prometheus.Labels{"proc": proc})
)

m.Inc()
defer m.Dec()

return next(ctx, conn)
err := next(ctx, conn)

s.With(prometheus.Labels{"status": fmt.Sprintf("%d", statusCode(err))}).Inc()

return err
}
}

0 comments on commit aa16939

Please sign in to comment.