Skip to content

Commit

Permalink
node: add status code support to gRPC metrics (#409)
Browse files Browse the repository at this point in the history
Add the following metrics:

- `grpc_unary_status_code[proc,status]`, unary calls grouped by
procedure and status
- `grpc_server_stream_status_code[proc,status]`, server streams grouped
by procedure and status
  • Loading branch information
bas-vk authored Jul 24, 2024
1 parent cafc5a5 commit 2730008
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 41 deletions.
16 changes: 1 addition & 15 deletions core/node/rpc/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@ import (
"log/slog"
"net/http"

"github.com/prometheus/client_golang/prometheus"

. "github.com/river-build/river/core/node/base"
"github.com/river-build/river/core/node/dlog"
"github.com/river-build/river/core/node/infra"
)

const (
Expand All @@ -18,22 +15,14 @@ const (
type httpHandler struct {
base http.Handler
log *slog.Logger

counter *prometheus.CounterVec
}

var _ http.Handler = (*httpHandler)(nil)

func newHttpHandler(b http.Handler, l *slog.Logger, m infra.MetricsFactory) *httpHandler {
func newHttpHandler(b http.Handler, l *slog.Logger) *httpHandler {
return &httpHandler{
base: b,
log: l,
counter: m.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests",
},
[]string{"method", "path", "protocol", "status"},
),
}
}

Expand Down Expand Up @@ -62,7 +51,4 @@ func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Add(RequestIdHeader, id)

h.base.ServeHTTP(w, r)

// TODO: implement status reporting here
h.counter.WithLabelValues(r.Method, r.URL.Path, r.Proto, "TODO").Inc()
}
72 changes: 58 additions & 14 deletions core/node/rpc/metrics_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package rpc

import (
"context"
"errors"
"strings"

"connectrpc.com/connect"
"github.com/prometheus/client_golang/prometheus"
"github.com/river-build/river/core/node/base"
"github.com/river-build/river/core/node/infra"
"github.com/river-build/river/core/node/shared"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand All @@ -15,18 +19,27 @@ type streamIdProvider interface {
}

type metricsInterceptor struct {
rpcDuration *prometheus.HistogramVec
unaryInflight *prometheus.GaugeVec
openClientStreams *prometheus.GaugeVec
openServerStreams *prometheus.GaugeVec
rpcDuration *prometheus.HistogramVec
unaryInflight *prometheus.GaugeVec
unaryStatusCode *prometheus.GaugeVec
openClientStreams *prometheus.GaugeVec
openServerStreams *prometheus.GaugeVec
serverStreamsStatusCode *prometheus.GaugeVec
}

func (s *Service) NewMetricsInterceptor() connect.Interceptor {
return &metricsInterceptor{
rpcDuration: s.rpcDuration,
unaryInflight: s.metrics.NewGaugeVecEx("grpc_unary_inflight", "gRPC unary calls in flight", "proc"),
openClientStreams: s.metrics.NewGaugeVecEx("grpc_open_client_streams", "gRPC open client streams", "proc"),
openServerStreams: s.metrics.NewGaugeVecEx("grpc_open_server_streams", "gRPC open server streams", "proc"),
rpcDuration: s.metrics.NewHistogramVecEx(
"rpc_duration_seconds",
"RPC duration in seconds",
infra.DefaultDurationBucketsSeconds,
"method",
),
unaryInflight: s.metrics.NewGaugeVecEx("grpc_unary_inflight", "gRPC unary calls in flight", "method"),
unaryStatusCode: s.metrics.NewGaugeVecEx("grpc_unary_status_code", "gRPC unary status code", "method", "status"),
openClientStreams: s.metrics.NewGaugeVecEx("grpc_open_client_streams", "gRPC open client streams", "method"),
openServerStreams: s.metrics.NewGaugeVecEx("grpc_open_server_streams", "gRPC open server streams", "method"),
serverStreamsStatusCode: s.metrics.NewGaugeVecEx("grpc_server_stream_status_code", "gRPC server stream status code", "method", "status"),
}
}

Expand All @@ -37,15 +50,16 @@ func (i *metricsInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc
) (connect.AnyResponse, error) {
var (
proc = req.Spec().Procedure
m = i.unaryInflight.With(prometheus.Labels{"proc": proc})
m = i.unaryInflight.With(prometheus.Labels{"method": proc})
s, _ = i.unaryStatusCode.CurryWith(prometheus.Labels{"method": proc})
)
m.Inc()

defer func() {
m.Dec()
prometheus.NewTimer(i.rpcDuration.WithLabelValues(proc)).ObserveDuration()
}()

// add streamId to tracing span
r, ok := req.Any().(streamIdProvider)
if ok {
Expand All @@ -56,7 +70,11 @@ func (i *metricsInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc
}
}

return next(ctx, req)
resp, err := next(ctx, req)

s.With(prometheus.Labels{"status": errorToStatus(err)}).Inc()

return resp, err
}
}

Expand All @@ -65,7 +83,7 @@ func (i *metricsInterceptor) WrapStreamingClient(next connect.StreamingClientFun
ctx context.Context,
spec connect.Spec,
) connect.StreamingClientConn {
m := i.openClientStreams.With(prometheus.Labels{"proc": spec.Procedure})
m := i.openClientStreams.With(prometheus.Labels{"method": spec.Procedure})

m.Inc()
defer m.Dec()
Expand All @@ -79,11 +97,37 @@ func (i *metricsInterceptor) WrapStreamingHandler(next connect.StreamingHandlerF
ctx context.Context,
conn connect.StreamingHandlerConn,
) error {
m := i.openClientStreams.With(prometheus.Labels{"proc": conn.Spec().Procedure})
var (
proc = conn.Spec().Procedure
m = i.openClientStreams.With(prometheus.Labels{"method": proc})
s, _ = i.serverStreamsStatusCode.CurryWith(prometheus.Labels{"method": proc})
)

m.Inc()
defer m.Dec()

return next(ctx, conn)
err := next(ctx, conn)

s.With(prometheus.Labels{"status": errorToStatus(err)}).Inc()

return err
}
}

func errorToStatus(err error) string {
if err == nil {
return "success"
}

var riverErr *base.RiverErrorImpl
if ok := errors.As(err, &riverErr); ok {
return strings.ToLower(riverErr.Code.String())
}

var connectErr *connect.Error
if ok := errors.As(err, &connectErr); ok {
return connectErr.Code().String()
}

return "unknown"
}
10 changes: 2 additions & 8 deletions core/node/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,6 @@ func (s *Service) initInstance(mode string) {
s.metrics = infra.NewMetricsFactory(metricsRegistry, "river", subsystem)
s.metricsPublisher = infra.NewMetricsPublisher(metricsRegistry)
s.metricsPublisher.StartMetricsServer(s.serverCtx, s.config.Metrics)
s.rpcDuration = s.metrics.NewHistogramVecEx(
"rpc_duration_seconds",
"RPC duration in seconds",
infra.DefaultDurationBucketsSeconds,
"method",
)
}

func (s *Service) initWallet() error {
Expand Down Expand Up @@ -582,10 +576,10 @@ func (s *Service) initHandlers() {

interceptors := connect.WithInterceptors(ii...)
streamServicePattern, streamServiceHandler := protocolconnect.NewStreamServiceHandler(s, interceptors)
s.mux.Handle(streamServicePattern, newHttpHandler(streamServiceHandler, s.defaultLogger, s.metrics))
s.mux.Handle(streamServicePattern, newHttpHandler(streamServiceHandler, s.defaultLogger))

nodeServicePattern, nodeServiceHandler := protocolconnect.NewNodeToNodeHandler(s, interceptors)
s.mux.Handle(nodeServicePattern, newHttpHandler(nodeServiceHandler, s.defaultLogger, s.metrics))
s.mux.Handle(nodeServicePattern, newHttpHandler(nodeServiceHandler, s.defaultLogger))

s.registerDebugHandlers(s.config.EnableDebugEndpoints)
}
Expand Down
6 changes: 2 additions & 4 deletions core/node/rpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package rpc

import (
"context"
river_sync "github.com/river-build/river/core/node/rpc/sync"
"log/slog"
"net"
"net/http"
Expand All @@ -11,8 +10,6 @@ import (

"connectrpc.com/otelconnect"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"

"github.com/river-build/river/core/config"
"github.com/river-build/river/core/node/auth"
"github.com/river-build/river/core/node/crypto"
Expand All @@ -21,8 +18,10 @@ import (
"github.com/river-build/river/core/node/nodes"
. "github.com/river-build/river/core/node/protocol/protocolconnect"
"github.com/river-build/river/core/node/registries"
river_sync "github.com/river-build/river/core/node/rpc/sync"
"github.com/river-build/river/core/node/storage"
"github.com/river-build/river/core/xchain/entitlement"
"go.opentelemetry.io/otel/trace"
)

type Service struct {
Expand Down Expand Up @@ -77,7 +76,6 @@ type Service struct {
// Metrics
metrics infra.MetricsFactory
metricsPublisher *infra.MetricsPublisher
rpcDuration *prometheus.HistogramVec
otelTraceProvider trace.TracerProvider
otelTracer trace.Tracer
otelConnectIterceptor *otelconnect.Interceptor
Expand Down

0 comments on commit 2730008

Please sign in to comment.