Skip to content

Commit

Permalink
add OnError hook to ManagedSettlingHandler (#123)
Browse files Browse the repository at this point in the history
  • Loading branch information
serbrech authored Apr 4, 2023
1 parent f78d0f9 commit 7258f5f
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 24 deletions.
57 changes: 37 additions & 20 deletions v2/managedsettling.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type ManagedSettler struct {
func (m *ManagedSettler) Handle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage) {
if err := m.next(ctx, message); err != nil {
log(ctx, "error returned from the handler. Calling ManagedSettler error handler")
m.handleError(ctx, settler, message, err)
m.options.OnError(ctx, m.options, settler, message, err)
return
}
if err := settler.CompleteMessage(ctx, message, nil); err != nil {
Expand Down Expand Up @@ -70,6 +70,11 @@ func (s *ConstantDelayStrategy) GetDelay(_ uint32) time.Duration {

// ManagedSettlingOptions allows to configure the ManagedSettling middleware
type ManagedSettlingOptions struct {
// Allows to override the built-in error handling logic.
// OnError is called before any message settling action is taken.
// the ManagedSettlingOptions struct is passed as an argument so that the configuration
// like RetryDecision, RetryDelayStrategy and the post-settlement hooks can be reused and composed differently
OnError func(ctx context.Context, opts *ManagedSettlingOptions, settler MessageSettler, message *azservicebus.ReceivedMessage, handleErr error)
// RetryDecision is invoked to decide whether an error should be retried.
// the default is to retry 5 times before moving the message to the deadletter.
RetryDecision RetryDecision
Expand All @@ -95,21 +100,11 @@ type ManagedSettlingOptions struct {
// the RetryDecision can be overridden and can inspect the error returned to decide to retry the message or not.
// this allows to define error types that shouldn't be retried (and moved directly to the deadletter queue)
func NewManagedSettlingHandler(opts *ManagedSettlingOptions, handler ManagedSettlingFunc) *ManagedSettler {
const (
defaultRetryDecisionMaxAttempts = 5
defaultDelay = 5 * time.Second
)
options := &ManagedSettlingOptions{
RetryDecision: &MaxAttemptsRetryDecision{MaxAttempts: defaultRetryDecisionMaxAttempts},
RetryDelayStrategy: &ConstantDelayStrategy{Delay: defaultDelay},
OnCompleted: func(_ context.Context, _ *azservicebus.ReceivedMessage) {
},
OnAbandoned: func(_ context.Context, _ *azservicebus.ReceivedMessage, _ error) {
},
OnDeadLettered: func(_ context.Context, _ *azservicebus.ReceivedMessage, _ error) {
},
}
options := defaultManagedSettlingOptions()
if opts != nil {
if opts.OnError != nil {
options.OnError = opts.OnError
}
if opts.RetryDecision != nil {
options.RetryDecision = opts.RetryDecision
}
Expand All @@ -132,26 +127,48 @@ func NewManagedSettlingHandler(opts *ManagedSettlingOptions, handler ManagedSett
}
}

func (m *ManagedSettler) handleError(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage, handleErr error) {
func defaultManagedSettlingOptions() *ManagedSettlingOptions {
const (
defaultRetryDecisionMaxAttempts = 5
defaultDelay = 5 * time.Second
)
return &ManagedSettlingOptions{
OnError: handleError,
RetryDecision: &MaxAttemptsRetryDecision{MaxAttempts: defaultRetryDecisionMaxAttempts},
RetryDelayStrategy: &ConstantDelayStrategy{Delay: defaultDelay},
OnCompleted: func(_ context.Context, _ *azservicebus.ReceivedMessage) {
},
OnAbandoned: func(_ context.Context, _ *azservicebus.ReceivedMessage, _ error) {
},
OnDeadLettered: func(_ context.Context, _ *azservicebus.ReceivedMessage, _ error) {
},
}
}

func handleError(ctx context.Context,
options *ManagedSettlingOptions,
settler MessageSettler,
message *azservicebus.ReceivedMessage,
handleErr error) {
if handleErr == nil {
handleErr = fmt.Errorf("nil error: %w", handleErr)
}
if !m.options.RetryDecision.CanRetry(handleErr, message) {
if !options.RetryDecision.CanRetry(handleErr, message) {
log(ctx, "moving message to dead letter queue because processing failed to an error: %s", handleErr)
deadLetterSettlement.settle(ctx, settler, message, &azservicebus.DeadLetterOptions{
Reason: to.Ptr("ManagedSettlingHandlerDeadLettering"),
ErrorDescription: to.Ptr(handleErr.Error()),
PropertiesToModify: nil,
})
// this could be a special hook to have more control on deadlettering, but keeping it simple for now
m.options.OnDeadLettered(ctx, message, handleErr)
options.OnDeadLettered(ctx, message, handleErr)
return
}
// the delay is implemented as an in-memory sleep before calling abandon.
// this will continue renewing the lock on the message while we wait for this delay to pass.
delay := m.options.RetryDelayStrategy.GetDelay(message.DeliveryCount)
delay := options.RetryDelayStrategy.GetDelay(message.DeliveryCount)
log(ctx, "delay strategy return delay of %s", delay)
time.Sleep(delay)
abandonSettlement.settle(ctx, settler, message, nil)
m.options.OnAbandoned(ctx, message, handleErr)
options.OnAbandoned(ctx, message, handleErr)
}
23 changes: 19 additions & 4 deletions v2/managedsettling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,8 @@ func TestManagedSettler_Handle(t *testing.T) {
}

func Test_NilErr_WrappedInDeadLetter(t *testing.T) {
h := NewManagedSettlingHandler(nil, nil)
settler := &fakeSettler{}
h.handleError(context.TODO(), settler, &azservicebus.ReceivedMessage{DeliveryCount: 6}, nil)
handleError(context.TODO(), defaultManagedSettlingOptions(), settler, &azservicebus.ReceivedMessage{DeliveryCount: 6}, nil)
g := NewWithT(t)
g.Expect(*settler.deadletterOptions.ErrorDescription).To(HavePrefix("nil error:"))
}
Expand All @@ -173,15 +172,31 @@ func TestDefaultOptions_CallDefaultHooks(t *testing.T) {
g.Expect(settler.completed).To(BeTrue())

settler = &fakeSettler{}
h.handleError(context.TODO(), settler, &azservicebus.ReceivedMessage{DeliveryCount: 0}, fmt.Errorf("oops"))
defaultOptions := defaultManagedSettlingOptions()
handleError(context.TODO(), defaultOptions, settler, &azservicebus.ReceivedMessage{DeliveryCount: 0}, fmt.Errorf("oops"))
g.Expect(settler.abandoned).To(BeTrue())

settler = &fakeSettler{}
h.handleError(context.TODO(), settler, &azservicebus.ReceivedMessage{DeliveryCount: 6}, fmt.Errorf("oops"))
handleError(context.TODO(), defaultOptions, settler, &azservicebus.ReceivedMessage{DeliveryCount: 6}, fmt.Errorf("oops"))
g.Expect(settler.deadlettered).To(BeTrue())
g.Expect(*settler.deadletterOptions.ErrorDescription).To(Equal("oops"))
}

func TestOnErrorOverride(t *testing.T) {
g := NewWithT(t)
settler := &fakeSettler{}
opts := defaultManagedSettlingOptions()
var onErrorCalled bool
opts.OnError = func(ctx context.Context, opts *ManagedSettlingOptions, settler MessageSettler, message *azservicebus.ReceivedMessage, handleErr error) {
onErrorCalled = true
}
h := NewManagedSettlingHandler(opts, func(_ context.Context, _ *azservicebus.ReceivedMessage) error {
return fmt.Errorf("failed")
})
h.Handle(context.Background(), settler, &azservicebus.ReceivedMessage{})
g.Expect(onErrorCalled).To(BeTrue())
}

func TestMaxAttemptsRetryDecision(t *testing.T) {
for _, tc := range []struct {
maxAttempts uint32
Expand Down

0 comments on commit 7258f5f

Please sign in to comment.