Skip to content

Commit

Permalink
[feat]✨ Add SendTimeout option on sender (#148)
Browse files Browse the repository at this point in the history
* add default send timeout option to sender
* improve default
  • Loading branch information
serbrech authored Aug 3, 2023
1 parent 2885d79 commit d38b80f
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 1 deletion.
20 changes: 19 additions & 1 deletion v2/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
)

const (
msgTypeField = "type"
msgTypeField = "type"
defaultSendTimeout = 10 * time.Second
)

// MessageBody is a type to represent that an input message body can be of any type
Expand All @@ -35,13 +36,20 @@ type SenderOptions struct {
Marshaller Marshaller
// EnableTracingPropagation automatically applies WithTracePropagation option on all message sent through this sender
EnableTracingPropagation bool
// SendTimeout is the timeout value used on the context that sends messages
// Defaults to 10 seconds if not set or 0
// Disabled when set to a negative value
SendTimeout time.Duration
}

// NewSender takes in a Sender and a Marshaller to create a new object that can send messages to the ServiceBus queue
func NewSender(sender AzServiceBusSender, options *SenderOptions) *Sender {
if options == nil {
options = &SenderOptions{Marshaller: &DefaultJSONMarshaller{}}
}
if options.SendTimeout == 0 {
options.SendTimeout = defaultSendTimeout
}
return &Sender{sbSender: sender, options: options}
}

Expand All @@ -52,6 +60,11 @@ func (d *Sender) SendMessage(ctx context.Context, mb MessageBody, options ...fun
if err != nil {
return err
}
if d.options.SendTimeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, d.options.SendTimeout)
defer cancel()
}
if err := d.sbSender.SendMessage(ctx, msg, nil); err != nil { // sendMessageOptions currently does nothing
return fmt.Errorf("failed to send message: %w", err)
}
Expand Down Expand Up @@ -98,6 +111,11 @@ func (d *Sender) SendMessageBatch(ctx context.Context, messages []*azservicebus.
return err
}
}
if d.options.SendTimeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, d.options.SendTimeout)
defer cancel()
}
if err := d.sbSender.SendMessageBatch(ctx, batch, nil); err != nil {
return fmt.Errorf("failed to send message batch: %w", err)
}
Expand Down
89 changes: 89 additions & 0 deletions v2/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,83 @@ func TestSender_SenderTracePropagation(t *testing.T) {
g.Expect(msg.ApplicationProperties["traceparent"]).ToNot(BeNil())
}

func TestSender_WithDefaultSendTimeout(t *testing.T) {
g := NewWithT(t)
azSender := &fakeAzSender{
DoSendMessage: func(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error {
dl, ok := ctx.Deadline()
g.Expect(ok).To(BeTrue())
g.Expect(dl).To(BeTemporally("~", time.Now().Add(defaultSendTimeout), time.Second))
return nil
},
DoSendMessageBatch: func(ctx context.Context, messages *azservicebus.MessageBatch, options *azservicebus.SendMessageBatchOptions) error {
dl, ok := ctx.Deadline()
g.Expect(ok).To(BeTrue())
g.Expect(dl).To(BeTemporally("~", time.Now().Add(defaultSendTimeout), time.Second))
return nil
},
}
sender := NewSender(azSender, &SenderOptions{
Marshaller: &DefaultJSONMarshaller{},
})
err := sender.SendMessage(context.Background(), "test")
g.Expect(err).ToNot(HaveOccurred())
err = sender.SendMessageBatch(context.Background(), nil)
g.Expect(err).ToNot(HaveOccurred())
}

func TestSender_WithSendTimeout(t *testing.T) {
g := NewWithT(t)
sendTimeout := 5 * time.Second
azSender := &fakeAzSender{
DoSendMessage: func(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error {
dl, ok := ctx.Deadline()
g.Expect(ok).To(BeTrue())
g.Expect(dl).To(BeTemporally("~", time.Now().Add(sendTimeout), time.Second))
return nil
},
DoSendMessageBatch: func(ctx context.Context, messages *azservicebus.MessageBatch, options *azservicebus.SendMessageBatchOptions) error {
dl, ok := ctx.Deadline()
g.Expect(ok).To(BeTrue())
g.Expect(dl).To(BeTemporally("~", time.Now().Add(sendTimeout), time.Second))
return nil
},
}
sender := NewSender(azSender, &SenderOptions{
Marshaller: &DefaultJSONMarshaller{},
SendTimeout: sendTimeout,
})
err := sender.SendMessage(context.Background(), "test")
g.Expect(err).ToNot(HaveOccurred())
err = sender.SendMessageBatch(context.Background(), nil)
g.Expect(err).ToNot(HaveOccurred())
}

func TestSender_DisabledSendTimeout(t *testing.T) {
g := NewWithT(t)
sendTimeout := -1 * time.Second
azSender := &fakeAzSender{
DoSendMessage: func(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error {
_, ok := ctx.Deadline()
g.Expect(ok).To(BeFalse())
return nil
},
DoSendMessageBatch: func(ctx context.Context, messages *azservicebus.MessageBatch, options *azservicebus.SendMessageBatchOptions) error {
_, ok := ctx.Deadline()
g.Expect(ok).To(BeFalse())
return nil
},
}
sender := NewSender(azSender, &SenderOptions{
Marshaller: &DefaultJSONMarshaller{},
SendTimeout: sendTimeout,
})
err := sender.SendMessage(context.Background(), "test")
g.Expect(err).ToNot(HaveOccurred())
err = sender.SendMessageBatch(context.Background(), nil)
g.Expect(err).ToNot(HaveOccurred())
}

func TestSender_SendMessage(t *testing.T) {
azSender := &fakeAzSender{}
sender := NewSender(azSender, nil)
Expand Down Expand Up @@ -139,6 +216,8 @@ func TestSender_AzSender(t *testing.T) {
}

type fakeAzSender struct {
DoSendMessage func(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error
DoSendMessageBatch func(ctx context.Context, batch *azservicebus.MessageBatch, options *azservicebus.SendMessageBatchOptions) error
SendMessageReceivedValue *azservicebus.Message
SendMessageReceivedCtx context.Context
SendMessageCalled bool
Expand All @@ -157,6 +236,11 @@ func (f *fakeAzSender) SendMessage(
f.SendMessageCalled = true
f.SendMessageReceivedValue = message
f.SendMessageReceivedCtx = ctx
if f.DoSendMessage != nil {
if err := f.DoSendMessage(ctx, message, options); err != nil {
return err
}
}
return f.SendMessageErr
}

Expand All @@ -166,6 +250,11 @@ func (f *fakeAzSender) SendMessageBatch(
options *azservicebus.SendMessageBatchOptions) error {
f.SendMessageBatchCalled = true
f.SendMessageBatchReceivedValue = batch
if f.DoSendMessageBatch != nil {
if err := f.DoSendMessageBatch(ctx, batch, options); err != nil {
return err
}
}
return f.SendMessageBatchErr
}

Expand Down

0 comments on commit d38b80f

Please sign in to comment.