From d7a7af06a602f13f6c397ab6e16d002cd5411c70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Erbrech?= Date: Tue, 9 Apr 2024 09:48:31 +1000 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20fix:=20do=20not=20count=20contex?= =?UTF-8?q?t=20errors=20as=20failure=20to=20renew=20a=20lock=20(#214)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * do not count context errors as failure to renew a lock * add tests for newly exposed funcs --- v2/lockrenewer.go | 24 +++++++++++++--- v2/lockrenewer_test.go | 46 +++++++++++++++++++++++------- v2/metrics/processor/types.go | 14 +++++++-- v2/metrics/processor/types_test.go | 11 +++++-- 4 files changed, 75 insertions(+), 20 deletions(-) diff --git a/v2/lockrenewer.go b/v2/lockrenewer.go index 67ff7bdf..d5c6b45f 100644 --- a/v2/lockrenewer.go +++ b/v2/lockrenewer.go @@ -9,9 +9,10 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" - "github.com/Azure/go-shuttle/v2/metrics/processor" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + + "github.com/Azure/go-shuttle/v2/metrics/processor" ) // LockRenewer abstracts the servicebus receiver client to only expose lock renewal @@ -26,12 +27,16 @@ type LockRenewalOptions struct { // CancelMessageContextOnStop will cancel the downstream message context when the renewal handler is stopped. // Defaults to true. CancelMessageContextOnStop *bool + // MetricRecorder allows to pass a custom metric recorder for the LockRenewer. + // Defaults to processor.Metric instance. + MetricRecorder processor.Recorder } // NewLockRenewalHandler returns a middleware handler that will renew the lock on the message at the specified interval. func NewLockRenewalHandler(lockRenewer LockRenewer, options *LockRenewalOptions, handler Handler) HandlerFunc { interval := 10 * time.Second cancelMessageContextOnStop := true + metricRecorder := processor.Metric if options != nil { if options.Interval != nil { interval = *options.Interval @@ -39,12 +44,16 @@ func NewLockRenewalHandler(lockRenewer LockRenewer, options *LockRenewalOptions, if options.CancelMessageContextOnStop != nil { cancelMessageContextOnStop = *options.CancelMessageContextOnStop } + if options.MetricRecorder != nil { + metricRecorder = options.MetricRecorder + } } return func(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage) { plr := &peekLockRenewer{ next: handler, lockRenewer: lockRenewer, renewalInterval: &interval, + metrics: metricRecorder, cancelMessageCtxOnStop: cancelMessageContextOnStop, stopped: make(chan struct{}, 1), // buffered channel to ensure we are not blocking } @@ -74,6 +83,7 @@ type peekLockRenewer struct { next Handler lockRenewer LockRenewer renewalInterval *time.Duration + metrics processor.Recorder alive atomic.Bool cancelMessageCtxOnStop bool cancelMessageCtx func() @@ -124,7 +134,13 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a err := plr.lockRenewer.RenewMessageLock(ctx, message, nil) if err != nil { log(ctx, fmt.Sprintf("failed to renew lock: %s", err)) - processor.Metric.IncMessageLockRenewedFailure(message) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + // if the error is a context error + // we stop and let the next loop iteration handle the exit. + plr.stop(ctx) + continue + } + plr.metrics.IncMessageLockRenewedFailure(message) // The context is canceled when the message handler returns from the processor. // This can happen if we already entered the interval case when the message processing completes. // The best we can do is log and retry on the next tick. The sdk already retries operations on recoverable network errors. @@ -140,14 +156,14 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a continue } span.AddEvent("message lock renewed", trace.WithAttributes(attribute.Int("count", count))) - processor.Metric.IncMessageLockRenewedSuccess(message) + plr.metrics.IncMessageLockRenewedSuccess(message) case <-ctx.Done(): log(ctx, "context done: stopping periodic renewal") span.AddEvent("context done: stopping message lock renewal") err := ctx.Err() if errors.Is(err, context.DeadlineExceeded) { span.RecordError(err) - processor.Metric.IncMessageDeadlineReachedCount(message) + plr.metrics.IncMessageDeadlineReachedCount(message) } plr.stop(ctx) case <-plr.stopped: diff --git a/v2/lockrenewer_test.go b/v2/lockrenewer_test.go index 1c4cf80e..a444f57c 100644 --- a/v2/lockrenewer_test.go +++ b/v2/lockrenewer_test.go @@ -11,8 +11,10 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" . "github.com/onsi/gomega" + "github.com/prometheus/client_golang/prometheus" "github.com/Azure/go-shuttle/v2" + "github.com/Azure/go-shuttle/v2/metrics/processor" ) type fakeSBLockRenewer struct { @@ -150,24 +152,40 @@ func Test_RenewPeriodically_Error(t *testing.T) { isRenewerCanceled bool cancelCtxOnStop *bool gotMessageCtx context.Context - verify func(g Gomega, tc *testCase) + verify func(g Gomega, tc *testCase, metrics *processor.Informer) } testCases := []testCase{ { name: "continue periodic renewal on unknown error", renewer: &fakeSBLockRenewer{Err: fmt.Errorf("unknown error")}, - verify: func(g Gomega, tc *testCase) { + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { g.Eventually( func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(2))) }, 130*time.Millisecond, 20*time.Millisecond).Should(Succeed()) }, }, + { + name: "stop periodic renewal on context canceled", + isRenewerCanceled: false, + renewer: &fakeSBLockRenewer{Err: context.Canceled}, + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { + g.Consistently( + func(g Gomega) { + g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1)), + "should not attempt to renew") + g.Expect(metrics.GetMessageLockRenewedFailureCount()).To(Equal(float64(0)), + "should not record failure metric") + }, + 130*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + }, + }, { name: "stop periodic renewal on context canceled", isRenewerCanceled: true, renewer: &fakeSBLockRenewer{Err: context.Canceled}, - verify: func(g Gomega, tc *testCase) { + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { g.Consistently( func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(0))) }, 130*time.Millisecond, @@ -177,7 +195,7 @@ func Test_RenewPeriodically_Error(t *testing.T) { { name: "stop periodic renewal on permanent error (lockLost)", renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}}, - verify: func(g Gomega, tc *testCase) { + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { g.Consistently( func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1))) }, 130*time.Millisecond, @@ -187,7 +205,7 @@ func Test_RenewPeriodically_Error(t *testing.T) { { name: "cancel message context on stop by default", renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}}, - verify: func(g Gomega, tc *testCase) { + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { g.Consistently( func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1))) }, 130*time.Millisecond, @@ -199,7 +217,7 @@ func Test_RenewPeriodically_Error(t *testing.T) { name: "does not cancel message context on stop if disabled", renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}}, cancelCtxOnStop: to.Ptr(false), - verify: func(g Gomega, tc *testCase) { + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { g.Consistently( func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1))) @@ -212,7 +230,7 @@ func Test_RenewPeriodically_Error(t *testing.T) { { name: "continue periodic renewal on transient error (timeout)", renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeTimeout}}, - verify: func(g Gomega, tc *testCase) { + verify: func(g Gomega, tc *testCase, metrics *processor.Informer) { g.Eventually( func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(2))) }, 140*time.Millisecond, @@ -225,7 +243,15 @@ func Test_RenewPeriodically_Error(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() interval := 50 * time.Millisecond - lr := shuttle.NewLockRenewalHandler(tc.renewer, &shuttle.LockRenewalOptions{Interval: &interval, CancelMessageContextOnStop: tc.cancelCtxOnStop}, + reg := processor.NewRegistry() + reg.Init(prometheus.NewRegistry()) + informer := processor.NewInformerFor(reg) + lr := shuttle.NewLockRenewalHandler(tc.renewer, + &shuttle.LockRenewalOptions{ + Interval: &interval, + CancelMessageContextOnStop: tc.cancelCtxOnStop, + MetricRecorder: reg, + }, shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) { tc.gotMessageCtx = ctx @@ -237,13 +263,13 @@ func Test_RenewPeriodically_Error(t *testing.T) { } })) msg := &azservicebus.ReceivedMessage{} - ctx, cancel := context.WithTimeout(context.TODO(), 200*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) if tc.isRenewerCanceled { cancel() } defer cancel() lr.Handle(ctx, &fakeSettler{}, msg) - tc.verify(NewWithT(t), &tc) + tc.verify(NewWithT(t), &tc, informer) }) } } diff --git a/v2/metrics/processor/types.go b/v2/metrics/processor/types.go index c0d93de9..c29090d3 100644 --- a/v2/metrics/processor/types.go +++ b/v2/metrics/processor/types.go @@ -17,12 +17,13 @@ const ( ) var ( - metricsRegistry = newRegistry() + metricsRegistry = NewRegistry() // Metric exposes a Recorder interface to manipulate the Processor metrics. Metric Recorder = metricsRegistry ) -func newRegistry() *Registry { +// NewRegistry creates a new Registry with initialized prometheus counter definitions +func NewRegistry() *Registry { return &Registry{ MessageReceivedCount: prom.NewCounterVec(prom.CounterOpts{ Name: "message_received_total", @@ -59,6 +60,7 @@ func getMessageTypeLabel(msg *azservicebus.ReceivedMessage) prom.Labels { } } +// Init registers the counters from the Registry on the prometheus.Registerer func (m *Registry) Init(reg prom.Registerer) { reg.MustRegister( m.MessageReceivedCount, @@ -68,6 +70,7 @@ func (m *Registry) Init(reg prom.Registerer) { m.ConcurrentMessageCount) } +// Registry provides the prometheus metrics for the message processor type Registry struct { MessageReceivedCount *prom.CounterVec MessageHandledCount *prom.CounterVec @@ -137,7 +140,12 @@ type Informer struct { // NewInformer creates an Informer for the current registry func NewInformer() *Informer { - return &Informer{registry: metricsRegistry} + return NewInformerFor(metricsRegistry) +} + +// NewInformerFor creates an Informer for the current registry +func NewInformerFor(r *Registry) *Informer { + return &Informer{registry: r} } // GetMessageLockRenewedFailureCount retrieves the current value of the MessageLockRenewedFailureCount metric diff --git a/v2/metrics/processor/types_test.go b/v2/metrics/processor/types_test.go index 18830f4d..e0b45e7f 100644 --- a/v2/metrics/processor/types_test.go +++ b/v2/metrics/processor/types_test.go @@ -26,13 +26,18 @@ func (f *fakeRegistry) Unregister(c prometheus.Collector) bool { func TestRegistry_Init(t *testing.T) { g := NewWithT(t) - r := newRegistry() + r := NewRegistry() fRegistry := &fakeRegistry{} g.Expect(func() { r.Init(prometheus.NewRegistry()) }).ToNot(Panic()) g.Expect(func() { r.Init(fRegistry) }).ToNot(Panic()) g.Expect(fRegistry.collectors).To(HaveLen(5)) Metric.IncMessageReceived(10) +} +func TestNewInformerDefault(t *testing.T) { + i := NewInformer() + g := NewWithT(t) + g.Expect(i.registry).To(Equal(Metric)) } func TestMetrics(t *testing.T) { @@ -55,9 +60,9 @@ func TestMetrics(t *testing.T) { }, } { g := NewWithT(t) - r := newRegistry() + r := NewRegistry() registerer := prometheus.NewRegistry() - informer := &Informer{registry: r} + informer := NewInformerFor(r) // before init count, err := informer.GetMessageLockRenewedFailureCount()