diff --git a/integration/publisher_queue_test.go b/integration/publisher_queue_test.go index 7d6d65a5..82fc4f10 100644 --- a/integration/publisher_queue_test.go +++ b/integration/publisher_queue_test.go @@ -44,6 +44,25 @@ func (suite *serviceBusQueueSuite) TestCreatePublisherUsingExistingQueue() { } } +// TestCreatePublisherWithDeadLetterForwardUsingNewQueue tests the creation of a publisher for a new queue +func (suite *serviceBusQueueSuite) TestCreatePublisherWithDeadLetterForwardUsingNewQueue() { + testContext := suite.T() + suite.Parallel() + queueName := "newQueue" + suite.TagID + _, err := queue.NewPublisher(context.Background(), queueName, suite.publisherAuthOption, publisher.WithForwardDeadLetteredMessagesTo(queueName+"DLQ", 1000)) + if suite.NoError(err) { + // make sure that queue exists + ns := suite.GetNewNamespace() + tm := ns.NewQueueManager() + _, err := tm.Get(context.Background(), queueName) + require.NoError(testContext, err) + + // delete new queue + err = tm.Delete(context.Background(), queueName) + require.NoError(testContext, err) + } +} + // TestPublishAfterIdle tests the creation of a publisher for an existing queue and a connection string func (suite *serviceBusQueueSuite) TestPublishAfterIdle() { suite.T().Parallel() diff --git a/queue/publisher/options.go b/queue/publisher/options.go index 52d3b761..887bdb11 100644 --- a/queue/publisher/options.go +++ b/queue/publisher/options.go @@ -1,14 +1,18 @@ package publisher import ( - "github.com/Azure/go-shuttle/common" - "github.com/Azure/go-shuttle/common/options/publisheropts" + "context" "time" - "github.com/Azure/azure-service-bus-go" + servicebus "github.com/Azure/azure-service-bus-go" "github.com/Azure/go-autorest/autorest/adal" + "github.com/Azure/go-shuttle/common" + "github.com/Azure/go-shuttle/common/options/publisheropts" ) +type DeadLetterTarget struct { +} + // ManagementOption provides structure for configuring a new Publisher type ManagementOption = publisheropts.ManagementOption @@ -55,6 +59,24 @@ func WithDuplicateDetection(window *time.Duration) ManagementOption { } } +// WithForwardDeadLetteredMessagesTo forwards deadlettered messages to a targetable queue, the identity must have management permissions on said queue +func WithForwardDeadLetteredMessagesTo(deadLetterTargetName string, deliveryCount int) ManagementOption { + return func(p common.Publisher) error { + qm := p.Namespace().NewQueueManager() + + if _, err := qm.Put(context.Background(), deadLetterTargetName, servicebus.QueueEntityWithMaxDeliveryCount(int32(deliveryCount))); err != nil { + return err + } + + deadLetterTarget, err := p.Namespace().NewQueueManager().Get(context.Background(), deadLetterTargetName) + if err != nil { + return err + } + p.(QueuePublisher).AppendQueueManagementOption(servicebus.QueueEntityWithForwardDeadLetteredMessagesTo(deadLetterTarget)) + return nil + } +} + // SetMessageDelay schedules a message in the future func SetMessageDelay(delay time.Duration) Option { return publisheropts.SetMessageDelay(delay)