Skip to content

Commit

Permalink
add basic metrics.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Sep 9, 2024
1 parent d034b12 commit 59b13af
Show file tree
Hide file tree
Showing 10 changed files with 504 additions and 62 deletions.
14 changes: 8 additions & 6 deletions cmd/maestro/server/auth_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,22 @@ func newAuthUnaryInterceptor(authNType string, authorizer grpcauthorizer.GRPCAut
}
}

// wrappedStream wraps a grpc.ServerStream associated with an incoming RPC, and
// wrappedAuthStream wraps a grpc.ServerStream associated with an incoming RPC, and
// a custom context containing the user and groups derived from the client certificate
// specified in the incoming RPC metadata
type wrappedStream struct {
type wrappedAuthStream struct {
grpc.ServerStream
ctx context.Context
}

func (w *wrappedStream) Context() context.Context {
// Context returns the context associated with the stream
func (w *wrappedAuthStream) Context() context.Context {
return w.ctx
}

func newWrappedStream(ctx context.Context, s grpc.ServerStream) grpc.ServerStream {
return &wrappedStream{s, ctx}
// newWrappedAuthStream creates a new wrappedAuthStream
func newWrappedAuthStream(ctx context.Context, s grpc.ServerStream) grpc.ServerStream {
return &wrappedAuthStream{s, ctx}
}

// newAuthStreamInterceptor creates a stream interceptor that retrieves the user and groups
Expand Down Expand Up @@ -167,6 +169,6 @@ func newAuthStreamInterceptor(authNType string, authorizer grpcauthorizer.GRPCAu
return fmt.Errorf("unsupported authentication Type %s", authNType)
}

return handler(srv, newWrappedStream(newContextWithIdentity(ss.Context(), user, groups), ss))
return handler(srv, newWrappedAuthStream(newContextWithIdentity(ss.Context(), user, groups), ss))
}
}
17 changes: 11 additions & 6 deletions cmd/maestro/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ func NewGRPCServer(resourceService services.ResourceService, eventBroadcaster *e
MaxVersion: tls.VersionTLS13,
}

// add metrics and auth interceptors
grpcServerOptions = append(grpcServerOptions,
grpc.ChainUnaryInterceptor(newMetricsUnaryInterceptor(), newAuthUnaryInterceptor(config.GRPCAuthNType, grpcAuthorizer)),
grpc.ChainStreamInterceptor(newMetricsStreamInterceptor(), newAuthStreamInterceptor(config.GRPCAuthNType, grpcAuthorizer)))

if config.GRPCAuthNType == "mtls" {
if len(config.ClientCAFile) == 0 {
check(fmt.Errorf("no client CA file specified when using mtls authorization type"), "Can't start gRPC server")
Expand All @@ -102,17 +107,17 @@ func NewGRPCServer(resourceService services.ResourceService, eventBroadcaster *e
tlsConfig.ClientCAs = certPool
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert

grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig)),
grpc.UnaryInterceptor(newAuthUnaryInterceptor(config.GRPCAuthNType, grpcAuthorizer)),
grpc.StreamInterceptor(newAuthStreamInterceptor(config.GRPCAuthNType, grpcAuthorizer)))
grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig)))
glog.Infof("Serving gRPC service with mTLS at %s", config.ServerBindPort)
} else {
grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig)),
grpc.UnaryInterceptor(newAuthUnaryInterceptor(config.GRPCAuthNType, grpcAuthorizer)),
grpc.StreamInterceptor(newAuthStreamInterceptor(config.GRPCAuthNType, grpcAuthorizer)))
grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig)))
glog.Infof("Serving gRPC service with TLS at %s", config.ServerBindPort)
}
} else {
// append metrics interceptor
grpcServerOptions = append(grpcServerOptions,
grpc.UnaryInterceptor(newMetricsUnaryInterceptor()),
grpc.StreamInterceptor(newMetricsStreamInterceptor()))
// Note: Do not use this in production.
glog.Infof("Serving gRPC service without TLS at %s", config.ServerBindPort)
}
Expand Down
242 changes: 242 additions & 0 deletions cmd/maestro/server/metrics_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package server

import (
"context"
"fmt"
"strings"
"time"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
pbv1 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1"
grpcprotocol "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol"
)

func init() {
// Register the metrics:
RegisterGRPCMetrics()
}

// NewMetricsUnaryInterceptor creates a unary server interceptor for server metrics.
// Currently supports the Publish method with PublishRequest.
func newMetricsUnaryInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// extract the type from the method name
methodInfo := strings.Split(info.FullMethod, "/")
if len(methodInfo) != 3 || methodInfo[2] != "Publish" {
return nil, fmt.Errorf("invalid method name: %s", info.FullMethod)
}
t := methodInfo[2]
pubReq, ok := req.(*pbv1.PublishRequest)
if !ok {
return nil, fmt.Errorf("invalid request type for Publish method")
}
// convert the request to cloudevent and extract the source
evt, err := binding.ToEvent(ctx, grpcprotocol.NewMessage(pubReq.Event))
if err != nil {
return nil, fmt.Errorf("failed to convert to cloudevent: %v", err)
}
source := evt.Source()
grpcCalledCountMetric.WithLabelValues(t, source).Inc()

grpcMessageReceivedCountMetric.WithLabelValues(t, source).Inc()
startTime := time.Now()
resp, err := handler(ctx, req)
duration := time.Since(startTime).Seconds()
grpcMessageSentCountMetric.WithLabelValues(t, source).Inc()

// get status code from error
status := statusFromError(err)
code := status.Code()
grpcProcessedCountMetric.WithLabelValues(t, source, code.String()).Inc()
grpcProcessedDurationMetric.WithLabelValues(t, source).Observe(duration)

return resp, err
}
}

// wrappedMetricsStream wraps a grpc.ServerStream, capturing the request source
// emitting metrics for the stream interceptor.
type wrappedMetricsStream struct {
t string
source *string
grpc.ServerStream
ctx context.Context
}

// RecvMsg wraps the RecvMsg method of the embedded grpc.ServerStream.
// It captures the source from the SubscriptionRequest and emits metrics.
func (w *wrappedMetricsStream) RecvMsg(m interface{}) error {
err := w.ServerStream.RecvMsg(m)
subReq, ok := m.(*pbv1.SubscriptionRequest)
if !ok {
return fmt.Errorf("invalid request type for Subscribe method")
}
*w.source = subReq.Source
grpcCalledCountMetric.WithLabelValues(w.t, subReq.Source).Inc()
grpcMessageReceivedCountMetric.WithLabelValues(w.t, subReq.Source).Inc()

return err
}

// SendMsg wraps the SendMsg method of the embedded grpc.ServerStream.
func (w *wrappedMetricsStream) SendMsg(m interface{}) error {
err := w.ServerStream.SendMsg(m)
grpcMessageSentCountMetric.WithLabelValues(w.t, *w.source).Inc()
return err
}

// newWrappedMetricsStream creates a wrappedMetricsStream with the specified type and source reference.
func newWrappedMetricsStream(t string, source *string, ctx context.Context, ss grpc.ServerStream) grpc.ServerStream {
return &wrappedMetricsStream{t, source, ss, ctx}
}

// newMetricsStreamInterceptor creates a stream server interceptor for server metrics.
// Currently supports the Subscribe method with SubscriptionRequest.
func newMetricsStreamInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// extract the type from the method name
if !info.IsServerStream || info.IsClientStream {
return fmt.Errorf("invalid stream type for stream method: %s", info.FullMethod)
}
methodInfo := strings.Split(info.FullMethod, "/")
if len(methodInfo) != 3 || methodInfo[2] != "Subscribe" {
return fmt.Errorf("invalid method name for stream method: %s", info.FullMethod)
}
t := methodInfo[2]
source := ""
// create a wrapped stream to capture the source and emit metrics
wrappedMetricsStream := newWrappedMetricsStream(t, &source, stream.Context(), stream)
err := handler(srv, wrappedMetricsStream)

// get status code from error
status := statusFromError(err)
code := status.Code()
grpcProcessedCountMetric.WithLabelValues(t, source, code.String()).Inc()

return err
}
}

// statusFromError returns a grpc status. If the error code is neither a valid grpc status
// nor a context error, codes.Unknown will be set.
func statusFromError(err error) *status.Status {
s, ok := status.FromError(err)
// Mirror what the grpc server itself does, i.e. also convert context errors to status
if !ok {
s = status.FromContextError(err)
}
return s
}

// Subsystem used to define the metrics:
const grpcMetricsSubsystem = "grpc_server"

// Names of the labels added to metrics:
const (
grpcMetricsTypeLabel = "type"
grpcMetricsSourceLabel = "source"
grpcMetricsCodeLabel = "code"
)

// grpcMetricsLabels - Array of labels added to metrics:
var grpcMetricsLabels = []string{
grpcMetricsTypeLabel,
grpcMetricsSourceLabel,
}

// grpcMetricsAllLabels - Array of all labels added to metrics:
var grpcMetricsAllLabels = []string{
grpcMetricsTypeLabel,
grpcMetricsSourceLabel,
grpcMetricsCodeLabel,
}

// Names of the metrics:
const (
calledCountMetric = "called_total"
processedCountMetric = "processed_total"
processedDurationMetric = "processed_duration_seconds"
messageReceivedCountMetric = "message_received_total"
messageSentCountMetric = "message_sent_total"
)

// Register the metrics:
func RegisterGRPCMetrics() {
prometheus.MustRegister(grpcCalledCountMetric)
prometheus.MustRegister(grpcProcessedCountMetric)
prometheus.MustRegister(grpcProcessedDurationMetric)
prometheus.MustRegister(grpcMessageReceivedCountMetric)
prometheus.MustRegister(grpcMessageSentCountMetric)
}

// Unregister the metrics:
func UnregisterGRPCMetrics() {
prometheus.Unregister(grpcCalledCountMetric)
prometheus.Unregister(grpcProcessedCountMetric)
prometheus.Unregister(grpcProcessedDurationMetric)
prometheus.Unregister(grpcMessageReceivedCountMetric)
prometheus.Unregister(grpcMessageSentCountMetric)
}

// Reset the metrics:
func ResetGRPCMetrics() {
grpcCalledCountMetric.Reset()
grpcProcessedCountMetric.Reset()
grpcProcessedDurationMetric.Reset()
grpcMessageReceivedCountMetric.Reset()
grpcMessageSentCountMetric.Reset()
}

// Description of the gRPC called count metric:
var grpcCalledCountMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: grpcMetricsSubsystem,
Name: calledCountMetric,
Help: "Total number of RPCs called on the server.",
},
grpcMetricsLabels,
)

// Description of the gRPC processed count metric:
var grpcProcessedCountMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: grpcMetricsSubsystem,
Name: processedCountMetric,
Help: "Total number of RPCs processed on the server, regardless of success or failure.",
},
grpcMetricsAllLabels,
)

// Description of the gRPC processed duration metric:
var grpcProcessedDurationMetric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: grpcMetricsSubsystem,
Name: processedDurationMetric,
Help: "Histogram of the duration of RPCs processed on the server.",
Buckets: prometheus.DefBuckets,
},
grpcMetricsLabels,
)

// Description of the gRPC message received count metric:
var grpcMessageReceivedCountMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: grpcMetricsSubsystem,
Name: messageReceivedCountMetric,
Help: "Total number of messages received on the server from agent and client.",
},
grpcMetricsLabels,
)

// Description of the gRPC message sent count metric:
var grpcMessageSentCountMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: grpcMetricsSubsystem,
Name: messageSentCountMetric,
Help: "Total number of messages sent by the server to agent and client.",
},
grpcMetricsLabels,
)
Loading

0 comments on commit 59b13af

Please sign in to comment.