Skip to content

Commit

Permalink
Adding forward DLQ to management option (#76)
Browse files Browse the repository at this point in the history
* adding forward DLQ management option

* caching suite.T()
  • Loading branch information
imiller31 authored Jan 19, 2022
1 parent 37c859c commit a88be32
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 3 deletions.
19 changes: 19 additions & 0 deletions integration/publisher_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,25 @@ func (suite *serviceBusQueueSuite) TestCreatePublisherUsingExistingQueue() {
}
}

// TestCreatePublisherWithDeadLetterForwardUsingNewQueue tests the creation of a publisher for a new queue
func (suite *serviceBusQueueSuite) TestCreatePublisherWithDeadLetterForwardUsingNewQueue() {
testContext := suite.T()
suite.Parallel()
queueName := "newQueue" + suite.TagID
_, err := queue.NewPublisher(context.Background(), queueName, suite.publisherAuthOption, publisher.WithForwardDeadLetteredMessagesTo(queueName+"DLQ", 1000))
if suite.NoError(err) {
// make sure that queue exists
ns := suite.GetNewNamespace()
tm := ns.NewQueueManager()
_, err := tm.Get(context.Background(), queueName)
require.NoError(testContext, err)

// delete new queue
err = tm.Delete(context.Background(), queueName)
require.NoError(testContext, err)
}
}

// TestPublishAfterIdle tests the creation of a publisher for an existing queue and a connection string
func (suite *serviceBusQueueSuite) TestPublishAfterIdle() {
suite.T().Parallel()
Expand Down
28 changes: 25 additions & 3 deletions queue/publisher/options.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package publisher

import (
"github.com/Azure/go-shuttle/common"
"github.com/Azure/go-shuttle/common/options/publisheropts"
"context"
"time"

"github.com/Azure/azure-service-bus-go"
servicebus "github.com/Azure/azure-service-bus-go"
"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-shuttle/common"
"github.com/Azure/go-shuttle/common/options/publisheropts"
)

type DeadLetterTarget struct {
}

// ManagementOption provides structure for configuring a new Publisher
type ManagementOption = publisheropts.ManagementOption

Expand Down Expand Up @@ -55,6 +59,24 @@ func WithDuplicateDetection(window *time.Duration) ManagementOption {
}
}

// WithForwardDeadLetteredMessagesTo forwards deadlettered messages to a targetable queue, the identity must have management permissions on said queue
func WithForwardDeadLetteredMessagesTo(deadLetterTargetName string, deliveryCount int) ManagementOption {
return func(p common.Publisher) error {
qm := p.Namespace().NewQueueManager()

if _, err := qm.Put(context.Background(), deadLetterTargetName, servicebus.QueueEntityWithMaxDeliveryCount(int32(deliveryCount))); err != nil {
return err
}

deadLetterTarget, err := p.Namespace().NewQueueManager().Get(context.Background(), deadLetterTargetName)
if err != nil {
return err
}
p.(QueuePublisher).AppendQueueManagementOption(servicebus.QueueEntityWithForwardDeadLetteredMessagesTo(deadLetterTarget))
return nil
}
}

// SetMessageDelay schedules a message in the future
func SetMessageDelay(delay time.Duration) Option {
return publisheropts.SetMessageDelay(delay)
Expand Down

0 comments on commit a88be32

Please sign in to comment.