diff --git a/components/metrics/builder.go b/components/metrics/builder.go index c70e4755a..32a10d38a 100644 --- a/components/metrics/builder.go +++ b/components/metrics/builder.go @@ -7,12 +7,27 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -func NewPrometheusMetricsBuilder(prometheusRegistry prometheus.Registerer, namespace string, subsystem string) PrometheusMetricsBuilder { - return PrometheusMetricsBuilder{ - Namespace: namespace, - Subsystem: subsystem, +type PrometheusMetricsBuilderConfig struct { + Namespace string + Subsystem string + AdditionalLabels []MetricLabel +} + +func NewPrometheusMetricsBuilderWithConfig(prometheusRegistry prometheus.Registerer, config PrometheusMetricsBuilderConfig) PrometheusMetricsBuilder { + builder := PrometheusMetricsBuilder{ + Namespace: config.Namespace, + Subsystem: config.Subsystem, PrometheusRegistry: prometheusRegistry, + additionalLabels: config.AdditionalLabels, } + return builder +} + +func NewPrometheusMetricsBuilder(prometheusRegistry prometheus.Registerer, namespace string, subsystem string) PrometheusMetricsBuilder { + return NewPrometheusMetricsBuilderWithConfig(prometheusRegistry, PrometheusMetricsBuilderConfig{ + Namespace: namespace, + Subsystem: subsystem, + }) } // PrometheusMetricsBuilder provides methods to decorate publishers, subscribers and handlers. @@ -22,6 +37,8 @@ type PrometheusMetricsBuilder struct { Namespace string Subsystem string + + additionalLabels []MetricLabel } // AddPrometheusRouterMetrics is a convenience function that acts on the message router to add the metrics middleware @@ -36,8 +53,9 @@ func (b PrometheusMetricsBuilder) AddPrometheusRouterMetrics(r *message.Router) func (b PrometheusMetricsBuilder) DecoratePublisher(pub message.Publisher) (message.Publisher, error) { var err error d := PublisherPrometheusMetricsDecorator{ - pub: pub, - publisherName: internal.StructName(pub), + pub: pub, + publisherName: internal.StructName(pub), + additionalLabels: b.additionalLabels, } d.publishTimeSeconds, err = b.registerHistogramVec(prometheus.NewHistogramVec( @@ -47,7 +65,7 @@ func (b PrometheusMetricsBuilder) DecoratePublisher(pub message.Publisher) (mess Name: "publish_time_seconds", Help: "The time that a publishing attempt (success or not) took in seconds", }, - publisherLabelKeys, + toLabelsSlice(publisherLabelKeys, b.additionalLabels), )) if err != nil { return nil, errors.Wrap(err, "could not register publish time metric") @@ -59,8 +77,9 @@ func (b PrometheusMetricsBuilder) DecoratePublisher(pub message.Publisher) (mess func (b PrometheusMetricsBuilder) DecorateSubscriber(sub message.Subscriber) (message.Subscriber, error) { var err error d := &SubscriberPrometheusMetricsDecorator{ - closing: make(chan struct{}), - subscriberName: internal.StructName(sub), + closing: make(chan struct{}), + subscriberName: internal.StructName(sub), + additionalLabels: b.additionalLabels, } d.subscriberMessagesReceivedTotal, err = b.registerCounterVec(prometheus.NewCounterVec( @@ -70,7 +89,7 @@ func (b PrometheusMetricsBuilder) DecorateSubscriber(sub message.Subscriber) (me Name: "subscriber_messages_received_total", Help: "The total number of messages received by the subscriber", }, - append(subscriberLabelKeys, labelAcked), + toLabelsSlice(append(subscriberLabelKeys, labelAcked), b.additionalLabels), )) if err != nil { return nil, errors.Wrap(err, "could not register time to ack metric") diff --git a/components/metrics/handler.go b/components/metrics/handler.go index b9240426f..aa983a6e9 100644 --- a/components/metrics/handler.go +++ b/components/metrics/handler.go @@ -35,6 +35,7 @@ var ( // HandlerPrometheusMetricsMiddleware is a middleware that captures Prometheus metrics. type HandlerPrometheusMetricsMiddleware struct { handlerExecutionTimeSeconds *prometheus.HistogramVec + additionalLabels []MetricLabel } // Middleware returns the middleware ready to be used with watermill's Router. @@ -45,6 +46,9 @@ func (m HandlerPrometheusMetricsMiddleware) Middleware(h message.HandlerFunc) me labels := prometheus.Labels{ labelKeyHandlerName: message.HandlerNameFromCtx(ctx), } + for _, lb := range m.additionalLabels { + labels[lb.Label] = lb.ComputeFn(ctx) + } defer func() { if err != nil { @@ -62,7 +66,9 @@ func (m HandlerPrometheusMetricsMiddleware) Middleware(h message.HandlerFunc) me // NewRouterMiddleware returns new middleware. func (b PrometheusMetricsBuilder) NewRouterMiddleware() HandlerPrometheusMetricsMiddleware { var err error - m := HandlerPrometheusMetricsMiddleware{} + m := HandlerPrometheusMetricsMiddleware{ + additionalLabels: b.additionalLabels, + } m.handlerExecutionTimeSeconds, err = b.registerHistogramVec(prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -72,7 +78,7 @@ func (b PrometheusMetricsBuilder) NewRouterMiddleware() HandlerPrometheusMetrics Help: "The total time elapsed while executing the handler function in seconds", Buckets: handlerExecutionTimeBuckets, }, - handlerLabelKeys, + toLabelsSlice(handlerLabelKeys, b.additionalLabels), )) if err != nil { panic(errors.Wrap(err, "could not register handler execution time metric")) diff --git a/components/metrics/labels.go b/components/metrics/labels.go index 6b928c9c0..f5acf7892 100644 --- a/components/metrics/labels.go +++ b/components/metrics/labels.go @@ -46,3 +46,30 @@ func labelsFromCtx(ctx context.Context, labels ...string) prometheus.Labels { return ctxLabels } + +type LabelComputeFn func(msgCtx context.Context) string + +type MetricLabel struct { + Label string + ComputeFn LabelComputeFn +} + +func toLabelsSlice(baseLabels []string, customs []MetricLabel) []string { + labels := make([]string, len(baseLabels), len(baseLabels)+len(customs)) + copy(labels, baseLabels) + for _, label := range customs { + //Check if the additional label is already in the base labels. We cannot have duplicate labels + //If it's in the base, just skip it as the compute function is going to overwrite the default value + contains := false + for _, baseLabel := range baseLabels { + if baseLabel == label.Label { + contains = true + break + } + } + if !contains { + labels = append(labels, label.Label) + } + } + return labels +} diff --git a/components/metrics/publisher.go b/components/metrics/publisher.go index 2110429b0..55f433bf4 100644 --- a/components/metrics/publisher.go +++ b/components/metrics/publisher.go @@ -20,6 +20,7 @@ type PublisherPrometheusMetricsDecorator struct { pub message.Publisher publisherName string publishTimeSeconds *prometheus.HistogramVec + additionalLabels []MetricLabel } // Publish updates the relevant publisher metrics and calls the wrapped publisher's Publish. @@ -37,6 +38,9 @@ func (m PublisherPrometheusMetricsDecorator) Publish(topic string, messages ...* if labels[labelKeyHandlerName] == "" { labels[labelKeyHandlerName] = labelValueNoHandler } + for _, lb := range m.additionalLabels { + labels[lb.Label] = lb.ComputeFn(ctx) + } start := time.Now() defer func() { diff --git a/components/metrics/subscriber.go b/components/metrics/subscriber.go index c439a53a4..bb1569432 100644 --- a/components/metrics/subscriber.go +++ b/components/metrics/subscriber.go @@ -18,6 +18,7 @@ type SubscriberPrometheusMetricsDecorator struct { subscriberName string subscriberMessagesReceivedTotal *prometheus.CounterVec closing chan struct{} + additionalLabels []MetricLabel } func (s SubscriberPrometheusMetricsDecorator) recordMetrics(msg *message.Message) { @@ -33,6 +34,9 @@ func (s SubscriberPrometheusMetricsDecorator) recordMetrics(msg *message.Message if labels[labelKeyHandlerName] == "" { labels[labelKeyHandlerName] = labelValueNoHandler } + for _, lb := range s.additionalLabels { + labels[lb.Label] = lb.ComputeFn(ctx) + } go func() { if subscribeAlreadyObserved(ctx) {