Skip to content

Commit

Permalink
🐛 fix: do not count context errors as failure to renew a lock (#214)
Browse files Browse the repository at this point in the history
* do not count context errors as failure to renew a lock
* add tests for newly exposed funcs
  • Loading branch information
serbrech authored Apr 8, 2024
1 parent 280b617 commit d7a7af0
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 20 deletions.
24 changes: 20 additions & 4 deletions v2/lockrenewer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/go-shuttle/v2/metrics/processor"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/Azure/go-shuttle/v2/metrics/processor"
)

// LockRenewer abstracts the servicebus receiver client to only expose lock renewal
Expand All @@ -26,25 +27,33 @@ type LockRenewalOptions struct {
// CancelMessageContextOnStop will cancel the downstream message context when the renewal handler is stopped.
// Defaults to true.
CancelMessageContextOnStop *bool
// MetricRecorder allows to pass a custom metric recorder for the LockRenewer.
// Defaults to processor.Metric instance.
MetricRecorder processor.Recorder
}

// NewLockRenewalHandler returns a middleware handler that will renew the lock on the message at the specified interval.
func NewLockRenewalHandler(lockRenewer LockRenewer, options *LockRenewalOptions, handler Handler) HandlerFunc {
interval := 10 * time.Second
cancelMessageContextOnStop := true
metricRecorder := processor.Metric
if options != nil {
if options.Interval != nil {
interval = *options.Interval
}
if options.CancelMessageContextOnStop != nil {
cancelMessageContextOnStop = *options.CancelMessageContextOnStop
}
if options.MetricRecorder != nil {
metricRecorder = options.MetricRecorder
}
}
return func(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage) {
plr := &peekLockRenewer{
next: handler,
lockRenewer: lockRenewer,
renewalInterval: &interval,
metrics: metricRecorder,
cancelMessageCtxOnStop: cancelMessageContextOnStop,
stopped: make(chan struct{}, 1), // buffered channel to ensure we are not blocking
}
Expand Down Expand Up @@ -74,6 +83,7 @@ type peekLockRenewer struct {
next Handler
lockRenewer LockRenewer
renewalInterval *time.Duration
metrics processor.Recorder
alive atomic.Bool
cancelMessageCtxOnStop bool
cancelMessageCtx func()
Expand Down Expand Up @@ -124,7 +134,13 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a
err := plr.lockRenewer.RenewMessageLock(ctx, message, nil)
if err != nil {
log(ctx, fmt.Sprintf("failed to renew lock: %s", err))
processor.Metric.IncMessageLockRenewedFailure(message)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// if the error is a context error
// we stop and let the next loop iteration handle the exit.
plr.stop(ctx)
continue
}
plr.metrics.IncMessageLockRenewedFailure(message)
// The context is canceled when the message handler returns from the processor.
// This can happen if we already entered the interval case when the message processing completes.
// The best we can do is log and retry on the next tick. The sdk already retries operations on recoverable network errors.
Expand All @@ -140,14 +156,14 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a
continue
}
span.AddEvent("message lock renewed", trace.WithAttributes(attribute.Int("count", count)))
processor.Metric.IncMessageLockRenewedSuccess(message)
plr.metrics.IncMessageLockRenewedSuccess(message)
case <-ctx.Done():
log(ctx, "context done: stopping periodic renewal")
span.AddEvent("context done: stopping message lock renewal")
err := ctx.Err()
if errors.Is(err, context.DeadlineExceeded) {
span.RecordError(err)
processor.Metric.IncMessageDeadlineReachedCount(message)
plr.metrics.IncMessageDeadlineReachedCount(message)
}
plr.stop(ctx)
case <-plr.stopped:
Expand Down
46 changes: 36 additions & 10 deletions v2/lockrenewer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"

"github.com/Azure/go-shuttle/v2"
"github.com/Azure/go-shuttle/v2/metrics/processor"
)

type fakeSBLockRenewer struct {
Expand Down Expand Up @@ -150,24 +152,40 @@ func Test_RenewPeriodically_Error(t *testing.T) {
isRenewerCanceled bool
cancelCtxOnStop *bool
gotMessageCtx context.Context
verify func(g Gomega, tc *testCase)
verify func(g Gomega, tc *testCase, metrics *processor.Informer)
}
testCases := []testCase{
{
name: "continue periodic renewal on unknown error",
renewer: &fakeSBLockRenewer{Err: fmt.Errorf("unknown error")},
verify: func(g Gomega, tc *testCase) {
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Eventually(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(2))) },
130*time.Millisecond,
20*time.Millisecond).Should(Succeed())
},
},
{
name: "stop periodic renewal on context canceled",
isRenewerCanceled: false,
renewer: &fakeSBLockRenewer{Err: context.Canceled},
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Consistently(
func(g Gomega) {
g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1)),
"should not attempt to renew")
g.Expect(metrics.GetMessageLockRenewedFailureCount()).To(Equal(float64(0)),
"should not record failure metric")
},
130*time.Millisecond,
20*time.Millisecond).Should(Succeed())
},
},
{
name: "stop periodic renewal on context canceled",
isRenewerCanceled: true,
renewer: &fakeSBLockRenewer{Err: context.Canceled},
verify: func(g Gomega, tc *testCase) {
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Consistently(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(0))) },
130*time.Millisecond,
Expand All @@ -177,7 +195,7 @@ func Test_RenewPeriodically_Error(t *testing.T) {
{
name: "stop periodic renewal on permanent error (lockLost)",
renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}},
verify: func(g Gomega, tc *testCase) {
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Consistently(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1))) },
130*time.Millisecond,
Expand All @@ -187,7 +205,7 @@ func Test_RenewPeriodically_Error(t *testing.T) {
{
name: "cancel message context on stop by default",
renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}},
verify: func(g Gomega, tc *testCase) {
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Consistently(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1))) },
130*time.Millisecond,
Expand All @@ -199,7 +217,7 @@ func Test_RenewPeriodically_Error(t *testing.T) {
name: "does not cancel message context on stop if disabled",
renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}},
cancelCtxOnStop: to.Ptr(false),
verify: func(g Gomega, tc *testCase) {
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Consistently(
func(g Gomega) {
g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1)))
Expand All @@ -212,7 +230,7 @@ func Test_RenewPeriodically_Error(t *testing.T) {
{
name: "continue periodic renewal on transient error (timeout)",
renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeTimeout}},
verify: func(g Gomega, tc *testCase) {
verify: func(g Gomega, tc *testCase, metrics *processor.Informer) {
g.Eventually(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(2))) },
140*time.Millisecond,
Expand All @@ -225,7 +243,15 @@ func Test_RenewPeriodically_Error(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
interval := 50 * time.Millisecond
lr := shuttle.NewLockRenewalHandler(tc.renewer, &shuttle.LockRenewalOptions{Interval: &interval, CancelMessageContextOnStop: tc.cancelCtxOnStop},
reg := processor.NewRegistry()
reg.Init(prometheus.NewRegistry())
informer := processor.NewInformerFor(reg)
lr := shuttle.NewLockRenewalHandler(tc.renewer,
&shuttle.LockRenewalOptions{
Interval: &interval,
CancelMessageContextOnStop: tc.cancelCtxOnStop,
MetricRecorder: reg,
},
shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler,
message *azservicebus.ReceivedMessage) {
tc.gotMessageCtx = ctx
Expand All @@ -237,13 +263,13 @@ func Test_RenewPeriodically_Error(t *testing.T) {
}
}))
msg := &azservicebus.ReceivedMessage{}
ctx, cancel := context.WithTimeout(context.TODO(), 200*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
if tc.isRenewerCanceled {
cancel()
}
defer cancel()
lr.Handle(ctx, &fakeSettler{}, msg)
tc.verify(NewWithT(t), &tc)
tc.verify(NewWithT(t), &tc, informer)
})
}
}
14 changes: 11 additions & 3 deletions v2/metrics/processor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ const (
)

var (
metricsRegistry = newRegistry()
metricsRegistry = NewRegistry()
// Metric exposes a Recorder interface to manipulate the Processor metrics.
Metric Recorder = metricsRegistry
)

func newRegistry() *Registry {
// NewRegistry creates a new Registry with initialized prometheus counter definitions
func NewRegistry() *Registry {
return &Registry{
MessageReceivedCount: prom.NewCounterVec(prom.CounterOpts{
Name: "message_received_total",
Expand Down Expand Up @@ -59,6 +60,7 @@ func getMessageTypeLabel(msg *azservicebus.ReceivedMessage) prom.Labels {
}
}

// Init registers the counters from the Registry on the prometheus.Registerer
func (m *Registry) Init(reg prom.Registerer) {
reg.MustRegister(
m.MessageReceivedCount,
Expand All @@ -68,6 +70,7 @@ func (m *Registry) Init(reg prom.Registerer) {
m.ConcurrentMessageCount)
}

// Registry provides the prometheus metrics for the message processor
type Registry struct {
MessageReceivedCount *prom.CounterVec
MessageHandledCount *prom.CounterVec
Expand Down Expand Up @@ -137,7 +140,12 @@ type Informer struct {

// NewInformer creates an Informer for the current registry
func NewInformer() *Informer {
return &Informer{registry: metricsRegistry}
return NewInformerFor(metricsRegistry)
}

// NewInformerFor creates an Informer for the current registry
func NewInformerFor(r *Registry) *Informer {
return &Informer{registry: r}
}

// GetMessageLockRenewedFailureCount retrieves the current value of the MessageLockRenewedFailureCount metric
Expand Down
11 changes: 8 additions & 3 deletions v2/metrics/processor/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@ func (f *fakeRegistry) Unregister(c prometheus.Collector) bool {

func TestRegistry_Init(t *testing.T) {
g := NewWithT(t)
r := newRegistry()
r := NewRegistry()
fRegistry := &fakeRegistry{}
g.Expect(func() { r.Init(prometheus.NewRegistry()) }).ToNot(Panic())
g.Expect(func() { r.Init(fRegistry) }).ToNot(Panic())
g.Expect(fRegistry.collectors).To(HaveLen(5))
Metric.IncMessageReceived(10)
}

func TestNewInformerDefault(t *testing.T) {
i := NewInformer()
g := NewWithT(t)
g.Expect(i.registry).To(Equal(Metric))
}

func TestMetrics(t *testing.T) {
Expand All @@ -55,9 +60,9 @@ func TestMetrics(t *testing.T) {
},
} {
g := NewWithT(t)
r := newRegistry()
r := NewRegistry()
registerer := prometheus.NewRegistry()
informer := &Informer{registry: r}
informer := NewInformerFor(r)

// before init
count, err := informer.GetMessageLockRenewedFailureCount()
Expand Down

0 comments on commit d7a7af0

Please sign in to comment.