From 4377caf18ccbfa42f54dad84173062aff0ccac95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Erbrech?= Date: Wed, 14 Dec 2022 08:52:25 +1100 Subject: [PATCH] implement log and metrics integration support (#89) * metrics port * metrics tests --- v2/e2e/suite_test.go | 4 + v2/metrics/registry.go | 180 ++++++++++++++++++++++++++++++++++++ v2/metrics/registry_test.go | 86 +++++++++++++++++ v2/processor.go | 69 +++++++++----- 4 files changed, 317 insertions(+), 22 deletions(-) create mode 100644 v2/metrics/registry.go create mode 100644 v2/metrics/registry_test.go diff --git a/v2/e2e/suite_test.go b/v2/e2e/suite_test.go index 2e859ee5..eb238700 100644 --- a/v2/e2e/suite_test.go +++ b/v2/e2e/suite_test.go @@ -62,6 +62,10 @@ func randomString(prefix string, length int) string { } func TestSuite(t *testing.T) { + t.Helper() + if os.Getenv("INTEGRATION") == "" { + t.Skip("skipping integration tests, set environment variable INTEGRATION") + } suite.Run(t, &SBSuite{Prefix: "v5"}) } diff --git a/v2/metrics/registry.go b/v2/metrics/registry.go new file mode 100644 index 00000000..3d10b903 --- /dev/null +++ b/v2/metrics/registry.go @@ -0,0 +1,180 @@ +// Package metrics allows to configure, record and read go-shuttle metrics +package metrics + +import ( + "fmt" + "strconv" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + prom "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +const ( + subsystem = "goshuttle_handler" + messageTypeLabel = "messageType" + deliveryCountLabel = "deliveryCount" + successLabel = "success" +) + +var ( + metricsRegistry = newRegistry() + // Processor exposes a Recorder interface to manipulate the Processor metrics. + Processor Recorder = metricsRegistry +) + +func newRegistry() *Registry { + return &Registry{ + MessageReceivedCount: prom.NewCounterVec(prom.CounterOpts{ + Name: "message_received_total", + Help: "total number of messages received by the processor", + Subsystem: subsystem, + }, []string{}), + MessageHandledCount: prom.NewCounterVec(prom.CounterOpts{ + Name: "message_handled_total", + Help: "total number of messages handled by this handler", + Subsystem: subsystem, + }, []string{messageTypeLabel, deliveryCountLabel}), + MessageLockRenewedCount: prom.NewCounterVec(prom.CounterOpts{ + Name: "message_lock_renewed_total", + Help: "total number of message lock renewal", + Subsystem: subsystem, + }, []string{messageTypeLabel, successLabel}), + MessageDeadlineReachedCount: prom.NewCounterVec(prom.CounterOpts{ + Name: "message_deadline_reached_total", + Help: "total number of message lock renewal", + Subsystem: subsystem, + }, []string{messageTypeLabel}), + ConcurrentMessageCount: prom.NewGaugeVec(prom.GaugeOpts{ + Name: "concurrent_message_count", + Help: "number of messages being handled concurrently", + Subsystem: subsystem, + }, []string{messageTypeLabel}), + } +} + +func getMessageTypeLabel(msg *azservicebus.ReceivedMessage) prom.Labels { + typeName := msg.ApplicationProperties["type"] + return map[string]string{ + messageTypeLabel: fmt.Sprintf("%s", typeName), + } +} + +func (m *Registry) Init(reg prom.Registerer) { + reg.MustRegister( + m.MessageReceivedCount, + m.MessageHandledCount, + m.MessageLockRenewedCount, + m.MessageDeadlineReachedCount, + m.ConcurrentMessageCount) +} + +type Registry struct { + MessageReceivedCount *prom.CounterVec + MessageHandledCount *prom.CounterVec + MessageLockRenewedCount *prom.CounterVec + MessageDeadlineReachedCount *prom.CounterVec + ConcurrentMessageCount *prom.GaugeVec +} + +// Recorder allows to initialize the metric registry and increase/decrease the registered metrics at runtime. +type Recorder interface { + Init(registerer prom.Registerer) + IncMessageDeadlineReachedCount(msg *azservicebus.ReceivedMessage) + IncMessageLockRenewedFailure(msg *azservicebus.ReceivedMessage) + IncMessageLockRenewedSuccess(msg *azservicebus.ReceivedMessage) + DecConcurrentMessageCount(msg *azservicebus.ReceivedMessage) + IncMessageHandled(msg *azservicebus.ReceivedMessage) + IncMessageReceived(float64) + IncConcurrentMessageCount(msg *azservicebus.ReceivedMessage) +} + +// IncMessageLockRenewedSuccess increase the message lock renewal success counter +func (m *Registry) IncMessageLockRenewedSuccess(msg *azservicebus.ReceivedMessage) { + labels := getMessageTypeLabel(msg) + labels[successLabel] = "true" + m.MessageLockRenewedCount.With(labels).Inc() +} + +// IncMessageLockRenewedFailure increase the message lock renewal failure counter +func (m *Registry) IncMessageLockRenewedFailure(msg *azservicebus.ReceivedMessage) { + labels := getMessageTypeLabel(msg) + labels[successLabel] = "false" + m.MessageLockRenewedCount.With(labels).Inc() +} + +// IncMessageHandled increase the message Handled +func (m *Registry) IncMessageHandled(msg *azservicebus.ReceivedMessage) { + labels := getMessageTypeLabel(msg) + labels[deliveryCountLabel] = strconv.FormatUint(uint64(msg.DeliveryCount), 10) + m.MessageHandledCount.With(labels).Inc() +} + +// IncConcurrentMessageCount increases the concurrent message counter +func (m *Registry) IncConcurrentMessageCount(msg *azservicebus.ReceivedMessage) { + m.ConcurrentMessageCount.With(getMessageTypeLabel(msg)).Inc() +} + +// DecConcurrentMessageCount decreases the concurrent message counter +func (m *Registry) DecConcurrentMessageCount(msg *azservicebus.ReceivedMessage) { + m.ConcurrentMessageCount.With(getMessageTypeLabel(msg)).Dec() +} + +// IncMessageDeadlineReachedCount increases the message deadline reached counter +func (m *Registry) IncMessageDeadlineReachedCount(msg *azservicebus.ReceivedMessage) { + labels := getMessageTypeLabel(msg) + m.MessageDeadlineReachedCount.With(labels).Inc() +} + +// IncMessageReceived increases the message received counter +func (m *Registry) IncMessageReceived(count float64) { + m.MessageReceivedCount.With(map[string]string{}).Add(count) +} + +// Informer allows to inspect metrics value stored in the registry at runtime +type Informer struct { + registry *Registry +} + +// NewInformer creates an Informer for the current registry +func NewInformer() *Informer { + return &Informer{registry: metricsRegistry} +} + +// GetMessageLockRenewedFailureCount retrieves the current value of the MessageLockRenewedFailureCount metric +func (i *Informer) GetMessageLockRenewedFailureCount() (float64, error) { + var total float64 + collect(i.registry.MessageLockRenewedCount, func(m dto.Metric) { + if !hasLabel(m, successLabel, "false") { + return + } + total += m.GetCounter().GetValue() + }) + return total, nil +} + +func hasLabel(m dto.Metric, key string, value string) bool { + for _, pair := range m.Label { + if pair == nil { + continue + } + if pair.GetName() == key && pair.GetValue() == value { + return true + } + } + return false +} + +// collect calls the function for each metric associated with the Collector +func collect(col prom.Collector, do func(dto.Metric)) { + c := make(chan prom.Metric) + go func(c chan prom.Metric) { + col.Collect(c) + close(c) + }(c) + for x := range c { // eg range across distinct label vector values + m := dto.Metric{} + _ = x.Write(&m) + do(m) + } +} diff --git a/v2/metrics/registry_test.go b/v2/metrics/registry_test.go new file mode 100644 index 00000000..feb4bf4a --- /dev/null +++ b/v2/metrics/registry_test.go @@ -0,0 +1,86 @@ +package metrics + +import ( + "testing" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + . "github.com/onsi/gomega" + "github.com/prometheus/client_golang/prometheus" +) + +type fakeRegistry struct { + collectors []prometheus.Collector +} + +func (f *fakeRegistry) Register(c prometheus.Collector) error { + panic("implement me") +} + +func (f *fakeRegistry) MustRegister(c ...prometheus.Collector) { + f.collectors = append(f.collectors, c...) +} + +func (f *fakeRegistry) Unregister(c prometheus.Collector) bool { + panic("implement me") +} + +func TestRegistry_Init(t *testing.T) { + g := NewWithT(t) + r := newRegistry() + fRegistry := &fakeRegistry{} + g.Expect(func() { r.Init(prometheus.NewRegistry()) }).ToNot(Panic()) + g.Expect(func() { r.Init(fRegistry) }).ToNot(Panic()) + g.Expect(fRegistry.collectors).To(HaveLen(5)) + Processor.IncMessageReceived(10) + +} + +func TestMetrics(t *testing.T) { + type testcase struct { + name string + msg *azservicebus.ReceivedMessage + } + for _, tc := range []testcase{ + { + name: "no type property", + msg: &azservicebus.ReceivedMessage{}, + }, + { + name: "with type property", + msg: &azservicebus.ReceivedMessage{ + ApplicationProperties: map[string]interface{}{ + "type": "someType", + }, + }, + }, + } { + g := NewWithT(t) + r := newRegistry() + registerer := prometheus.NewRegistry() + informer := &Informer{registry: r} + + // before init + count, err := informer.GetMessageLockRenewedFailureCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(0))) + + // after init, count 0 + g.Expect(func() { r.Init(registerer) }).ToNot(Panic()) + count, err = informer.GetMessageLockRenewedFailureCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(0))) + + // count incremented + r.IncMessageLockRenewedFailure(tc.msg) + count, err = informer.GetMessageLockRenewedFailureCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(1))) + + // count failure only + r.IncMessageLockRenewedSuccess(tc.msg) + count, err = informer.GetMessageLockRenewedFailureCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(1))) + } + +} diff --git a/v2/processor.go b/v2/processor.go index 792fbd69..e3d27092 100644 --- a/v2/processor.go +++ b/v2/processor.go @@ -9,6 +9,8 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "github.com/Azure/go-shuttle/v2/metrics" + "github.com/devigned/tab" ) type Receiver interface { @@ -46,9 +48,8 @@ type Processor struct { // MaxConcurrency defaults to 1. Not setting MaxConcurrency, or setting it to 0 or a negative value will fallback to the default. // ReceiveInterval defaults to 2 seconds if not set. type ProcessorOptions struct { - MaxConcurrency int - ReceiveInterval *time.Duration - EnrichContextFunc func(ctx context.Context, message *azservicebus.ReceivedMessage) + MaxConcurrency int + ReceiveInterval *time.Duration } func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOptions) *Processor { @@ -75,7 +76,8 @@ func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOpti // Start starts the processor and blocks until an error occurs or the context is canceled. func (p *Processor) Start(ctx context.Context) error { messages, err := p.receiver.ReceiveMessages(ctx, p.options.MaxConcurrency, nil) - log("received ", len(messages), " messages - initial") + log(ctx, "received ", len(messages), " messages - initial") + metrics.Processor.IncMessageReceived(float64(len(messages))) if err != nil { return err } @@ -90,7 +92,8 @@ func (p *Processor) Start(ctx context.Context) error { break } messages, err := p.receiver.ReceiveMessages(ctx, maxMessages, nil) - log("received ", len(messages), " messages from loop") + log(ctx, "received ", len(messages), " messages from loop") + metrics.Processor.IncMessageReceived(float64(len(messages))) if err != nil { return err } @@ -98,11 +101,11 @@ func (p *Processor) Start(ctx context.Context) error { p.process(ctx, msg) } case <-ctx.Done(): - log("context done, stop receiving") + log(ctx, "context done, stop receiving") break } } - log("exiting processor") + log(ctx, "exiting processor") return ctx.Err() } @@ -113,7 +116,10 @@ func (p *Processor) process(ctx context.Context, message *azservicebus.ReceivedM defer cancel() defer func() { <-p.concurrencyTokens + metrics.Processor.IncMessageHandled(message) + metrics.Processor.DecConcurrentMessageCount(message) }() + metrics.Processor.IncConcurrentMessageCount(message) p.handle(msgContext, p.receiver, message) }() } @@ -151,41 +157,60 @@ type peekLockRenewer struct { } func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *azservicebus.ReceivedMessage) { - // _, span := tracing.StartSpanFromMessageAndContext(ctx, "go-shuttle.peeklock.startPeriodicRenewal", message) - // defer span.End() count := 0 for alive := true; alive; { select { case <-time.After(*plr.renewalInterval): - log("renewing lock") + log(ctx, "renewing lock") count++ - // tab.For(ctx).Debug("Renewing message lock", tab.Int64Attribute("count", int64(count))) + tab.For(ctx).Debug("Renewing message lock", tab.Int64Attribute("count", int64(count))) err := plr.lockRenewer.RenewMessageLock(ctx, message, nil) if err != nil { - log("ERROR failed to renew lock") - // listener.Metrics.IncMessageLockRenewedFailure(message) + log(ctx, "failed to renew lock: ", err) + metrics.Processor.IncMessageLockRenewedFailure(message) // I don't think this is a problem. the context is canceled when the message processing is over. // this can happen if we already entered the interval case when the message is completing. - // tab.For(ctx).Info("failed to renew the peek lock", tab.StringAttribute("reason", err.Error())) + tab.For(ctx).Error(fmt.Errorf("failed to renew lock: %w", err)) return } - // tab.For(ctx).Debug("renewed lock success") - // listener.Metrics.IncMessageLockRenewedSuccess(message) + tab.For(ctx).Debug("renewed lock success") + metrics.Processor.IncMessageLockRenewedSuccess(message) case <-ctx.Done(): - log("Context Done, stop lock renewal") - // tab.For(ctx).Info("Stopping periodic renewal") + log(ctx, ctx, "context done: stopping periodic renewal") + tab.For(ctx).Info("stopping periodic renewal") err := ctx.Err() if errors.Is(err, context.DeadlineExceeded) { - // listener.Metrics.IncMessageDeadlineReachedCount(message) + metrics.Processor.IncMessageDeadlineReachedCount(message) } alive = false } } } -// dumb log until we integrate logging -func log(a ...any) { +type Logger interface { + Info(s string) + Warn(s string) + Error(s string) +} + +var getLogger = func(_ context.Context) Logger { return &printLogger{} } + +type printLogger struct{} + +func (l *printLogger) Info(s string) { + fmt.Println(append(append([]any{}, "INFO - ", time.Now().UTC(), " - "), s)...) +} + +func (l *printLogger) Warn(s string) { + fmt.Println(append(append([]any{}, "WARN - ", time.Now().UTC(), " - "), s)...) +} + +func (l *printLogger) Error(s string) { + fmt.Println(append(append([]any{}, "ERROR - ", time.Now().UTC(), " - "), s)...) +} + +func log(ctx context.Context, a ...any) { if os.Getenv("GOSHUTTLE_LOG") == "ALL" { - fmt.Println(append(append([]any{}, time.Now().UTC(), " - "), a...)...) + getLogger(ctx).Info(fmt.Sprint(append(append([]any{}, time.Now().UTC(), " - "), a...)...)) } }