diff --git a/v2/lockrenewer.go b/v2/lockrenewer.go index e527aeed..67ff7bdf 100644 --- a/v2/lockrenewer.go +++ b/v2/lockrenewer.go @@ -9,10 +9,9 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "github.com/Azure/go-shuttle/v2/metrics/processor" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" - - "github.com/Azure/go-shuttle/v2/metrics" ) // LockRenewer abstracts the servicebus receiver client to only expose lock renewal @@ -125,7 +124,7 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a err := plr.lockRenewer.RenewMessageLock(ctx, message, nil) if err != nil { log(ctx, fmt.Sprintf("failed to renew lock: %s", err)) - metrics.Processor.IncMessageLockRenewedFailure(message) + processor.Metric.IncMessageLockRenewedFailure(message) // The context is canceled when the message handler returns from the processor. // This can happen if we already entered the interval case when the message processing completes. // The best we can do is log and retry on the next tick. The sdk already retries operations on recoverable network errors. @@ -141,14 +140,14 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a continue } span.AddEvent("message lock renewed", trace.WithAttributes(attribute.Int("count", count))) - metrics.Processor.IncMessageLockRenewedSuccess(message) + processor.Metric.IncMessageLockRenewedSuccess(message) case <-ctx.Done(): log(ctx, "context done: stopping periodic renewal") span.AddEvent("context done: stopping message lock renewal") err := ctx.Err() if errors.Is(err, context.DeadlineExceeded) { span.RecordError(err) - metrics.Processor.IncMessageDeadlineReachedCount(message) + processor.Metric.IncMessageDeadlineReachedCount(message) } plr.stop(ctx) case <-plr.stopped: diff --git a/v2/metrics/processor/types.go b/v2/metrics/processor/types.go new file mode 100644 index 00000000..e1b1f98b --- /dev/null +++ b/v2/metrics/processor/types.go @@ -0,0 +1,179 @@ +package processor + +import ( + "fmt" + "strconv" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + prom "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +const ( + subsystem = "goshuttle_handler" + messageTypeLabel = "messageType" + deliveryCountLabel = "deliveryCount" + successLabel = "success" +) + +var ( + metricsRegistry = newRegistry() + // Processor exposes a Recorder interface to manipulate the Processor metrics. + Metric Recorder = metricsRegistry +) + +func newRegistry() *Registry { + return &Registry{ + MessageReceivedCount: prom.NewCounterVec(prom.CounterOpts{ + Name: "message_received_total", + Help: "total number of messages received by the processor", + Subsystem: subsystem, + }, []string{}), + MessageHandledCount: prom.NewCounterVec(prom.CounterOpts{ + Name: "message_handled_total", + Help: "total number of messages handled by this handler", + Subsystem: subsystem, + }, []string{messageTypeLabel, deliveryCountLabel}), + MessageLockRenewedCount: prom.NewCounterVec(prom.CounterOpts{ + Name: "message_lock_renewed_total", + Help: "total number of message lock renewal", + Subsystem: subsystem, + }, []string{messageTypeLabel, successLabel}), + MessageDeadlineReachedCount: prom.NewCounterVec(prom.CounterOpts{ + Name: "message_deadline_reached_total", + Help: "total number of message lock renewal", + Subsystem: subsystem, + }, []string{messageTypeLabel}), + ConcurrentMessageCount: prom.NewGaugeVec(prom.GaugeOpts{ + Name: "concurrent_message_count", + Help: "number of messages being handled concurrently", + Subsystem: subsystem, + }, []string{messageTypeLabel}), + } +} + +func getMessageTypeLabel(msg *azservicebus.ReceivedMessage) prom.Labels { + typeName := msg.ApplicationProperties["type"] + return map[string]string{ + messageTypeLabel: fmt.Sprintf("%s", typeName), + } +} + +func (m *Registry) Init(reg prom.Registerer) { + reg.MustRegister( + m.MessageReceivedCount, + m.MessageHandledCount, + m.MessageLockRenewedCount, + m.MessageDeadlineReachedCount, + m.ConcurrentMessageCount) +} + +type Registry struct { + MessageReceivedCount *prom.CounterVec + MessageHandledCount *prom.CounterVec + MessageLockRenewedCount *prom.CounterVec + MessageDeadlineReachedCount *prom.CounterVec + ConcurrentMessageCount *prom.GaugeVec +} + +// Recorder allows to initialize the metric registry and increase/decrease the registered metrics at runtime. +type Recorder interface { + Init(registerer prom.Registerer) + IncMessageDeadlineReachedCount(msg *azservicebus.ReceivedMessage) + IncMessageLockRenewedFailure(msg *azservicebus.ReceivedMessage) + IncMessageLockRenewedSuccess(msg *azservicebus.ReceivedMessage) + DecConcurrentMessageCount(msg *azservicebus.ReceivedMessage) + IncMessageHandled(msg *azservicebus.ReceivedMessage) + IncMessageReceived(float64) + IncConcurrentMessageCount(msg *azservicebus.ReceivedMessage) +} + +// IncMessageLockRenewedSuccess increase the message lock renewal success counter +func (m *Registry) IncMessageLockRenewedSuccess(msg *azservicebus.ReceivedMessage) { + labels := getMessageTypeLabel(msg) + labels[successLabel] = "true" + m.MessageLockRenewedCount.With(labels).Inc() +} + +// IncMessageLockRenewedFailure increase the message lock renewal failure counter +func (m *Registry) IncMessageLockRenewedFailure(msg *azservicebus.ReceivedMessage) { + labels := getMessageTypeLabel(msg) + labels[successLabel] = "false" + m.MessageLockRenewedCount.With(labels).Inc() +} + +// IncMessageHandled increase the message Handled +func (m *Registry) IncMessageHandled(msg *azservicebus.ReceivedMessage) { + labels := getMessageTypeLabel(msg) + labels[deliveryCountLabel] = strconv.FormatUint(uint64(msg.DeliveryCount), 10) + m.MessageHandledCount.With(labels).Inc() +} + +// IncConcurrentMessageCount increases the concurrent message counter +func (m *Registry) IncConcurrentMessageCount(msg *azservicebus.ReceivedMessage) { + m.ConcurrentMessageCount.With(getMessageTypeLabel(msg)).Inc() +} + +// DecConcurrentMessageCount decreases the concurrent message counter +func (m *Registry) DecConcurrentMessageCount(msg *azservicebus.ReceivedMessage) { + m.ConcurrentMessageCount.With(getMessageTypeLabel(msg)).Dec() +} + +// IncMessageDeadlineReachedCount increases the message deadline reached counter +func (m *Registry) IncMessageDeadlineReachedCount(msg *azservicebus.ReceivedMessage) { + labels := getMessageTypeLabel(msg) + m.MessageDeadlineReachedCount.With(labels).Inc() +} + +// IncMessageReceived increases the message received counter +func (m *Registry) IncMessageReceived(count float64) { + m.MessageReceivedCount.With(map[string]string{}).Add(count) +} + +// Informer allows to inspect metrics value stored in the registry at runtime +type Informer struct { + registry *Registry +} + +// NewInformer creates an Informer for the current registry +func NewInformer() *Informer { + return &Informer{registry: metricsRegistry} +} + +// GetMessageLockRenewedFailureCount retrieves the current value of the MessageLockRenewedFailureCount metric +func (i *Informer) GetMessageLockRenewedFailureCount() (float64, error) { + var total float64 + collect(i.registry.MessageLockRenewedCount, func(m dto.Metric) { + if !hasLabel(m, successLabel, "false") { + return + } + total += m.GetCounter().GetValue() + }) + return total, nil +} + +func hasLabel(m dto.Metric, key string, value string) bool { + for _, pair := range m.Label { + if pair == nil { + continue + } + if pair.GetName() == key && pair.GetValue() == value { + return true + } + } + return false +} + +// collect calls the function for each metric associated with the Collector +func collect(col prom.Collector, do func(dto.Metric)) { + c := make(chan prom.Metric) + go func(c chan prom.Metric) { + col.Collect(c) + close(c) + }(c) + for x := range c { // eg range across distinct label vector values + m := dto.Metric{} + _ = x.Write(&m) + do(m) + } +} diff --git a/v2/metrics/processor/types_test.go b/v2/metrics/processor/types_test.go new file mode 100644 index 00000000..18830f4d --- /dev/null +++ b/v2/metrics/processor/types_test.go @@ -0,0 +1,86 @@ +package processor + +import ( + "testing" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + . "github.com/onsi/gomega" + "github.com/prometheus/client_golang/prometheus" +) + +type fakeRegistry struct { + collectors []prometheus.Collector +} + +func (f *fakeRegistry) Register(c prometheus.Collector) error { + panic("implement me") +} + +func (f *fakeRegistry) MustRegister(c ...prometheus.Collector) { + f.collectors = append(f.collectors, c...) +} + +func (f *fakeRegistry) Unregister(c prometheus.Collector) bool { + panic("implement me") +} + +func TestRegistry_Init(t *testing.T) { + g := NewWithT(t) + r := newRegistry() + fRegistry := &fakeRegistry{} + g.Expect(func() { r.Init(prometheus.NewRegistry()) }).ToNot(Panic()) + g.Expect(func() { r.Init(fRegistry) }).ToNot(Panic()) + g.Expect(fRegistry.collectors).To(HaveLen(5)) + Metric.IncMessageReceived(10) + +} + +func TestMetrics(t *testing.T) { + type testcase struct { + name string + msg *azservicebus.ReceivedMessage + } + for _, tc := range []testcase{ + { + name: "no type property", + msg: &azservicebus.ReceivedMessage{}, + }, + { + name: "with type property", + msg: &azservicebus.ReceivedMessage{ + ApplicationProperties: map[string]interface{}{ + "type": "someType", + }, + }, + }, + } { + g := NewWithT(t) + r := newRegistry() + registerer := prometheus.NewRegistry() + informer := &Informer{registry: r} + + // before init + count, err := informer.GetMessageLockRenewedFailureCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(0))) + + // after init, count 0 + g.Expect(func() { r.Init(registerer) }).ToNot(Panic()) + count, err = informer.GetMessageLockRenewedFailureCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(0))) + + // count incremented + r.IncMessageLockRenewedFailure(tc.msg) + count, err = informer.GetMessageLockRenewedFailureCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(1))) + + // count failure only + r.IncMessageLockRenewedSuccess(tc.msg) + count, err = informer.GetMessageLockRenewedFailureCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(1))) + } + +} diff --git a/v2/metrics/registry.go b/v2/metrics/registry.go index ff6d46da..374cc3bf 100644 --- a/v2/metrics/registry.go +++ b/v2/metrics/registry.go @@ -2,184 +2,13 @@ package metrics import ( - "fmt" - "strconv" - - "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "github.com/Azure/go-shuttle/v2/metrics/processor" + "github.com/Azure/go-shuttle/v2/metrics/sender" prom "github.com/prometheus/client_golang/prometheus" - dto "github.com/prometheus/client_model/go" -) - -const ( - subsystem = "goshuttle_handler" - messageTypeLabel = "messageType" - deliveryCountLabel = "deliveryCount" - successLabel = "success" -) - -var ( - metricsRegistry = newRegistry() - // Processor exposes a Recorder interface to manipulate the Processor metrics. - Processor Recorder = metricsRegistry ) // Register registers the go shuttle metrics with the given prometheus registerer. func Register(reg prom.Registerer) { - metricsRegistry.Init(reg) -} - -func newRegistry() *Registry { - return &Registry{ - MessageReceivedCount: prom.NewCounterVec(prom.CounterOpts{ - Name: "message_received_total", - Help: "total number of messages received by the processor", - Subsystem: subsystem, - }, []string{}), - MessageHandledCount: prom.NewCounterVec(prom.CounterOpts{ - Name: "message_handled_total", - Help: "total number of messages handled by this handler", - Subsystem: subsystem, - }, []string{messageTypeLabel, deliveryCountLabel}), - MessageLockRenewedCount: prom.NewCounterVec(prom.CounterOpts{ - Name: "message_lock_renewed_total", - Help: "total number of message lock renewal", - Subsystem: subsystem, - }, []string{messageTypeLabel, successLabel}), - MessageDeadlineReachedCount: prom.NewCounterVec(prom.CounterOpts{ - Name: "message_deadline_reached_total", - Help: "total number of message lock renewal", - Subsystem: subsystem, - }, []string{messageTypeLabel}), - ConcurrentMessageCount: prom.NewGaugeVec(prom.GaugeOpts{ - Name: "concurrent_message_count", - Help: "number of messages being handled concurrently", - Subsystem: subsystem, - }, []string{messageTypeLabel}), - } -} - -func getMessageTypeLabel(msg *azservicebus.ReceivedMessage) prom.Labels { - typeName := msg.ApplicationProperties["type"] - return map[string]string{ - messageTypeLabel: fmt.Sprintf("%s", typeName), - } -} - -func (m *Registry) Init(reg prom.Registerer) { - reg.MustRegister( - m.MessageReceivedCount, - m.MessageHandledCount, - m.MessageLockRenewedCount, - m.MessageDeadlineReachedCount, - m.ConcurrentMessageCount) -} - -type Registry struct { - MessageReceivedCount *prom.CounterVec - MessageHandledCount *prom.CounterVec - MessageLockRenewedCount *prom.CounterVec - MessageDeadlineReachedCount *prom.CounterVec - ConcurrentMessageCount *prom.GaugeVec -} - -// Recorder allows to initialize the metric registry and increase/decrease the registered metrics at runtime. -type Recorder interface { - Init(registerer prom.Registerer) - IncMessageDeadlineReachedCount(msg *azservicebus.ReceivedMessage) - IncMessageLockRenewedFailure(msg *azservicebus.ReceivedMessage) - IncMessageLockRenewedSuccess(msg *azservicebus.ReceivedMessage) - DecConcurrentMessageCount(msg *azservicebus.ReceivedMessage) - IncMessageHandled(msg *azservicebus.ReceivedMessage) - IncMessageReceived(float64) - IncConcurrentMessageCount(msg *azservicebus.ReceivedMessage) -} - -// IncMessageLockRenewedSuccess increase the message lock renewal success counter -func (m *Registry) IncMessageLockRenewedSuccess(msg *azservicebus.ReceivedMessage) { - labels := getMessageTypeLabel(msg) - labels[successLabel] = "true" - m.MessageLockRenewedCount.With(labels).Inc() -} - -// IncMessageLockRenewedFailure increase the message lock renewal failure counter -func (m *Registry) IncMessageLockRenewedFailure(msg *azservicebus.ReceivedMessage) { - labels := getMessageTypeLabel(msg) - labels[successLabel] = "false" - m.MessageLockRenewedCount.With(labels).Inc() -} - -// IncMessageHandled increase the message Handled -func (m *Registry) IncMessageHandled(msg *azservicebus.ReceivedMessage) { - labels := getMessageTypeLabel(msg) - labels[deliveryCountLabel] = strconv.FormatUint(uint64(msg.DeliveryCount), 10) - m.MessageHandledCount.With(labels).Inc() -} - -// IncConcurrentMessageCount increases the concurrent message counter -func (m *Registry) IncConcurrentMessageCount(msg *azservicebus.ReceivedMessage) { - m.ConcurrentMessageCount.With(getMessageTypeLabel(msg)).Inc() -} - -// DecConcurrentMessageCount decreases the concurrent message counter -func (m *Registry) DecConcurrentMessageCount(msg *azservicebus.ReceivedMessage) { - m.ConcurrentMessageCount.With(getMessageTypeLabel(msg)).Dec() -} - -// IncMessageDeadlineReachedCount increases the message deadline reached counter -func (m *Registry) IncMessageDeadlineReachedCount(msg *azservicebus.ReceivedMessage) { - labels := getMessageTypeLabel(msg) - m.MessageDeadlineReachedCount.With(labels).Inc() -} - -// IncMessageReceived increases the message received counter -func (m *Registry) IncMessageReceived(count float64) { - m.MessageReceivedCount.With(map[string]string{}).Add(count) -} - -// Informer allows to inspect metrics value stored in the registry at runtime -type Informer struct { - registry *Registry -} - -// NewInformer creates an Informer for the current registry -func NewInformer() *Informer { - return &Informer{registry: metricsRegistry} -} - -// GetMessageLockRenewedFailureCount retrieves the current value of the MessageLockRenewedFailureCount metric -func (i *Informer) GetMessageLockRenewedFailureCount() (float64, error) { - var total float64 - collect(i.registry.MessageLockRenewedCount, func(m dto.Metric) { - if !hasLabel(m, successLabel, "false") { - return - } - total += m.GetCounter().GetValue() - }) - return total, nil -} - -func hasLabel(m dto.Metric, key string, value string) bool { - for _, pair := range m.Label { - if pair == nil { - continue - } - if pair.GetName() == key && pair.GetValue() == value { - return true - } - } - return false -} - -// collect calls the function for each metric associated with the Collector -func collect(col prom.Collector, do func(dto.Metric)) { - c := make(chan prom.Metric) - go func(c chan prom.Metric) { - col.Collect(c) - close(c) - }(c) - for x := range c { // eg range across distinct label vector values - m := dto.Metric{} - _ = x.Write(&m) - do(m) - } + sender.Metric.Init(reg) + processor.Metric.Init(reg) } diff --git a/v2/metrics/registry_test.go b/v2/metrics/registry_test.go index 6988c480..323b6a24 100644 --- a/v2/metrics/registry_test.go +++ b/v2/metrics/registry_test.go @@ -3,7 +3,6 @@ package metrics import ( "testing" - "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" . "github.com/onsi/gomega" "github.com/prometheus/client_golang/prometheus" ) @@ -24,70 +23,9 @@ func (f *fakeRegistry) Unregister(c prometheus.Collector) bool { panic("implement me") } -func TestRegistry_Init(t *testing.T) { - g := NewWithT(t) - r := newRegistry() - fRegistry := &fakeRegistry{} - g.Expect(func() { r.Init(prometheus.NewRegistry()) }).ToNot(Panic()) - g.Expect(func() { r.Init(fRegistry) }).ToNot(Panic()) - g.Expect(fRegistry.collectors).To(HaveLen(5)) - Processor.IncMessageReceived(10) - -} - func TestRegister(t *testing.T) { g := NewWithT(t) reg := &fakeRegistry{} g.Expect(func() { Register(reg) }).ToNot(Panic()) - g.Expect(reg.collectors).To(HaveLen(5)) -} - -func TestMetrics(t *testing.T) { - type testcase struct { - name string - msg *azservicebus.ReceivedMessage - } - for _, tc := range []testcase{ - { - name: "no type property", - msg: &azservicebus.ReceivedMessage{}, - }, - { - name: "with type property", - msg: &azservicebus.ReceivedMessage{ - ApplicationProperties: map[string]interface{}{ - "type": "someType", - }, - }, - }, - } { - g := NewWithT(t) - r := newRegistry() - registerer := prometheus.NewRegistry() - informer := &Informer{registry: r} - - // before init - count, err := informer.GetMessageLockRenewedFailureCount() - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(count).To(Equal(float64(0))) - - // after init, count 0 - g.Expect(func() { r.Init(registerer) }).ToNot(Panic()) - count, err = informer.GetMessageLockRenewedFailureCount() - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(count).To(Equal(float64(0))) - - // count incremented - r.IncMessageLockRenewedFailure(tc.msg) - count, err = informer.GetMessageLockRenewedFailureCount() - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(count).To(Equal(float64(1))) - - // count failure only - r.IncMessageLockRenewedSuccess(tc.msg) - count, err = informer.GetMessageLockRenewedFailureCount() - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(count).To(Equal(float64(1))) - } - + g.Expect(reg.collectors).To(HaveLen(6)) } diff --git a/v2/metrics/sender/types.go b/v2/metrics/sender/types.go new file mode 100644 index 00000000..d1ca3cc0 --- /dev/null +++ b/v2/metrics/sender/types.go @@ -0,0 +1,108 @@ +package sender + +import ( + prom "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +const ( + subsystem = "goshuttle_handler" + successLabel = "success" +) + +var ( + metricsRegistry = newRegistry() + // Processor exposes a Recorder interface to manipulate the Processor metrics. + Metric Recorder = metricsRegistry +) + +func newRegistry() *Registry { + return &Registry{ + MessageSentCount: prom.NewCounterVec(prom.CounterOpts{ + Name: "message_sent_total", + Help: "total number of messages sent by the sender", + Subsystem: subsystem, + }, []string{successLabel}), + } +} + +func (m *Registry) Init(reg prom.Registerer) { + reg.MustRegister( + m.MessageSentCount, + ) +} + +type Registry struct { + MessageSentCount *prom.CounterVec +} + +// Recorder allows to initialize the metric registry and increase/decrease the registered metrics at runtime. +type Recorder interface { + Init(registerer prom.Registerer) + IncSendMessageSuccessCount() + IncSendMessageFailureCount() +} + +// IncSendMessageSuccessCount increases the MessageSentCount metric with success == true +func (m *Registry) IncSendMessageSuccessCount() { + m.MessageSentCount.With( + prom.Labels{ + successLabel: "true", + }).Inc() +} + +// IncSendMessageFailureCount increases the MessageSentCount metric with success == false +func (m *Registry) IncSendMessageFailureCount() { + m.MessageSentCount.With( + prom.Labels{ + successLabel: "false", + }).Inc() +} + +// Informer allows to inspect metrics value stored in the registry at runtime +type Informer struct { + registry *Registry +} + +// NewInformer creates an Informer for the current registry +func NewInformer() *Informer { + return &Informer{registry: metricsRegistry} +} + +// GetSendMessageFailureCount returns the total number of messages sent by the sender with success == false +func (i *Informer) GetSendMessageFailureCount() (float64, error) { + var total float64 + collect(i.registry.MessageSentCount, func(m dto.Metric) { + if !hasLabel(m, successLabel, "false") { + return + } + total += m.GetCounter().GetValue() + }) + return total, nil +} + +func hasLabel(m dto.Metric, key string, value string) bool { + for _, pair := range m.Label { + if pair == nil { + continue + } + if pair.GetName() == key && pair.GetValue() == value { + return true + } + } + return false +} + +// collect calls the function for each metric associated with the Collector +func collect(col prom.Collector, do func(dto.Metric)) { + c := make(chan prom.Metric) + go func(c chan prom.Metric) { + col.Collect(c) + close(c) + }(c) + for x := range c { // eg range across distinct label vector values + m := dto.Metric{} + _ = x.Write(&m) + do(m) + } +} diff --git a/v2/metrics/sender/types_test.go b/v2/metrics/sender/types_test.go new file mode 100644 index 00000000..122191bc --- /dev/null +++ b/v2/metrics/sender/types_test.go @@ -0,0 +1,64 @@ +package sender + +import ( + "testing" + + . "github.com/onsi/gomega" + "github.com/prometheus/client_golang/prometheus" +) + +type fakeRegistry struct { + collectors []prometheus.Collector +} + +func (f *fakeRegistry) Register(c prometheus.Collector) error { + panic("implement me") +} + +func (f *fakeRegistry) MustRegister(c ...prometheus.Collector) { + f.collectors = append(f.collectors, c...) +} + +func (f *fakeRegistry) Unregister(c prometheus.Collector) bool { + panic("implement me") +} + +func TestRegistry_Init(t *testing.T) { + g := NewWithT(t) + r := newRegistry() + fRegistry := &fakeRegistry{} + g.Expect(func() { r.Init(prometheus.NewRegistry()) }).ToNot(Panic()) + g.Expect(func() { r.Init(fRegistry) }).ToNot(Panic()) + g.Expect(fRegistry.collectors).To(HaveLen(1)) + Metric.IncSendMessageSuccessCount() +} + +func TestMetrics(t *testing.T) { + g := NewWithT(t) + r := newRegistry() + registerer := prometheus.NewRegistry() + informer := &Informer{registry: r} + + // before init + count, err := informer.GetSendMessageFailureCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(0))) + + // after init, count 0 + g.Expect(func() { r.Init(registerer) }).ToNot(Panic()) + count, err = informer.GetSendMessageFailureCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(0))) + + // count incremented + r.IncSendMessageFailureCount() + count, err = informer.GetSendMessageFailureCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(1))) + + // count failure only + r.IncSendMessageSuccessCount() + count, err = informer.GetSendMessageFailureCount() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(count).To(Equal(float64(1))) +} diff --git a/v2/processor.go b/v2/processor.go index c78a925f..9b2a900f 100644 --- a/v2/processor.go +++ b/v2/processor.go @@ -7,8 +7,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" - - "github.com/Azure/go-shuttle/v2/metrics" + "github.com/Azure/go-shuttle/v2/metrics/processor" ) type Receiver interface { @@ -82,7 +81,7 @@ func (p *Processor) Start(ctx context.Context) error { return err } log(ctx, fmt.Sprintf("received %d messages - initial", len(messages))) - metrics.Processor.IncMessageReceived(float64(len(messages))) + processor.Metric.IncMessageReceived(float64(len(messages))) for _, msg := range messages { p.process(ctx, msg) } @@ -98,7 +97,7 @@ func (p *Processor) Start(ctx context.Context) error { return err } log(ctx, fmt.Sprintf("received %d messages from processor loop", len(messages))) - metrics.Processor.IncMessageReceived(float64(len(messages))) + processor.Metric.IncMessageReceived(float64(len(messages))) for _, msg := range messages { p.process(ctx, msg) } @@ -119,10 +118,10 @@ func (p *Processor) process(ctx context.Context, message *azservicebus.ReceivedM defer cancel() defer func() { <-p.concurrencyTokens - metrics.Processor.IncMessageHandled(message) - metrics.Processor.DecConcurrentMessageCount(message) + processor.Metric.IncMessageHandled(message) + processor.Metric.DecConcurrentMessageCount(message) }() - metrics.Processor.IncConcurrentMessageCount(message) + processor.Metric.IncConcurrentMessageCount(message) p.handle.Handle(msgContext, p.receiver, message) }() }