Skip to content

Commit

Permalink
feat: Expose batch operations (#122)
Browse files Browse the repository at this point in the history
* expose batch operations
* add tests
* add more doc comments
* fix imports
  • Loading branch information
serbrech authored Apr 4, 2023
1 parent 7258f5f commit 723fb11
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 11 deletions.
50 changes: 45 additions & 5 deletions v2/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type MessageBody any
// AzServiceBusSender is satisfied by *azservicebus.Sender
type AzServiceBusSender interface {
SendMessage(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error
SendMessageBatch(ctx context.Context, batch *azservicebus.MessageBatch, options *azservicebus.SendMessageBatchOptions) error
NewMessageBatch(ctx context.Context, options *azservicebus.MessageBatchOptions) (*azservicebus.MessageBatch, error)
}

// Sender contains an SBSender used to send the message to the ServiceBus queue and a Marshaller used to marshal any struct into a ServiceBus message
Expand All @@ -43,13 +45,33 @@ func NewSender(sender AzServiceBusSender, options *SenderOptions) *Sender {
return &Sender{sbSender: sender, options: options}
}

// SendMessage sends a payload on the bus.
// the MessageBody is marshalled and set as the message body.
func (d *Sender) SendMessage(ctx context.Context, mb MessageBody, options ...func(msg *azservicebus.Message) error) error {
msg, err := d.ToServiceBusMessage(ctx, mb, options...)
if err != nil {
return err
}
if err := d.sbSender.SendMessage(ctx, msg, nil); err != nil { // sendMessageOptions currently does nothing
return fmt.Errorf("failed to send message: %w", err)
}
return nil
}

// ToServiceBusMessage transform a MessageBody into an azservicebus.Message.
// It marshals the body using the sender's configured marshaller,
// and set the bytes as the message.Body.
// the sender's configured options are applied to the azservicebus.Message before
// returning it.
func (d *Sender) ToServiceBusMessage(
ctx context.Context,
mb MessageBody,
options ...func(msg *azservicebus.Message) error) (*azservicebus.Message, error) {
// uses a marshaller to marshal the message into a service bus message
msg, err := d.options.Marshaller.Marshal(mb)
if err != nil {
return fmt.Errorf("failed to marshal original struct into ServiceBus message: %w", err)
return nil, fmt.Errorf("failed to marshal original struct into ServiceBus message: %w", err)
}

msgType := getMessageType(mb)
msg.ApplicationProperties = map[string]interface{}{msgTypeField: msgType}

Expand All @@ -59,17 +81,35 @@ func (d *Sender) SendMessage(ctx context.Context, mb MessageBody, options ...fun

for _, option := range options {
if err := option(msg); err != nil {
return fmt.Errorf("failed to run message options: %w", err)
return nil, fmt.Errorf("failed to run message options: %w", err)
}
}
return msg, nil
}

if err := d.sbSender.SendMessage(ctx, msg, nil); err != nil { // sendMessageOptions currently does nothing
return fmt.Errorf("failed to send message: %w", err)
// SendMessageBatch sends the array of azservicebus messages as a batch.
func (d *Sender) SendMessageBatch(ctx context.Context, messages []*azservicebus.Message) error {
batch, err := d.sbSender.NewMessageBatch(ctx, &azservicebus.MessageBatchOptions{})
if err != nil {
return err
}
for _, msg := range messages {
if err := batch.AddMessage(msg, nil); err != nil {
return err
}
}
if err := d.sbSender.SendMessageBatch(ctx, batch, nil); err != nil {
return fmt.Errorf("failed to send message batch: %w", err)
}

return nil
}

// AzSender returns the underlying azservicebus.Sender instance.
func (d *Sender) AzSender() AzServiceBusSender {
return d.sbSender
}

// SetMessageId sets the ServiceBus message's ID to a user-specified value
func SetMessageId(messageId *string) func(msg *azservicebus.Message) error {
return func(msg *azservicebus.Message) error {
Expand Down
114 changes: 111 additions & 3 deletions v2/sender_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package shuttle

import (
"context"
"fmt"
"reflect"
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"go.opentelemetry.io/otel/sdk/trace"

. "github.com/onsi/gomega"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

Expand Down Expand Up @@ -55,14 +62,115 @@ func TestHandlers_SetScheduleAt(t *testing.T) {
}
}

func TestHandlers_SetMessageDelay(t *testing.T) {
blankMsg := &azservicebus.Message{}
g := NewWithT(t)
option := SetMessageDelay(1 * time.Minute)
if err := option(blankMsg); err != nil {
t.Errorf("Unexpected error in set schedule at test: %s", err)
}
g.Expect(*blankMsg.ScheduledEnqueueTime).To(BeTemporally("~", time.Now().Add(1*time.Minute), time.Second))
}

func TestHandlers_SetMessageTTL(t *testing.T) {
blankMsg := &azservicebus.Message{}
ttl := time.Duration(10 * time.Second)
handler := SetMessageTTL(ttl)
if err := handler(blankMsg); err != nil {
ttl := 10 * time.Second
option := SetMessageTTL(ttl)
if err := option(blankMsg); err != nil {
t.Errorf("Unexpected error in set message TTL at test: %s", err)
}
if *blankMsg.TimeToLive != ttl {
t.Errorf("for message TTL at expected %s, got %s", ttl, *blankMsg.TimeToLive)
}
}

func TestSender_SenderTracePropagation(t *testing.T) {
g := NewWithT(t)
azSender := &fakeAzSender{}
sender := NewSender(azSender, &SenderOptions{
EnableTracingPropagation: true,
Marshaller: &DefaultJSONMarshaller{},
})

tp := trace.NewTracerProvider(trace.WithSampler(trace.AlwaysSample()))
ctx, span := tp.Tracer("testTracer").Start(
context.Background(),
"receiver.Handle")

msg, err := sender.ToServiceBusMessage(ctx, "test")
span.End()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(msg.ApplicationProperties["traceparent"]).ToNot(BeNil())
}

func TestSender_SendMessage(t *testing.T) {
azSender := &fakeAzSender{}
sender := NewSender(azSender, nil)
err := sender.SendMessage(context.Background(), "test", SetMessageId(to.Ptr("messageID")))
g := NewWithT(t)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(azSender.SendMessageCalled).To(BeTrue())
g.Expect(string(azSender.SendMessageReceivedValue.Body)).To(Equal("\"test\""))
g.Expect(*azSender.SendMessageReceivedValue.MessageID).To(Equal("messageID"))

azSender.SendMessageErr = fmt.Errorf("msg send failure")
err = sender.SendMessage(context.Background(), "test")
g.Expect(err).To(And(HaveOccurred(), MatchError(azSender.SendMessageErr)))
}

func TestSender_SendMessageBatch(t *testing.T) {
g := NewWithT(t)
azSender := &fakeAzSender{
NewMessageBatchReturnValue: &azservicebus.MessageBatch{},
}
sender := NewSender(azSender, nil)
msg, err := sender.ToServiceBusMessage(context.Background(), "test")
g.Expect(err).ToNot(HaveOccurred())
err = sender.SendMessageBatch(context.Background(), []*azservicebus.Message{msg})
g.Expect(err).To(HaveOccurred())
// No way to create a MessageBatch struct with a non-0 max bytes in test, so the best we can do is expect an error.
}

func TestSender_AzSender(t *testing.T) {
g := NewWithT(t)
azSender := &fakeAzSender{}
sender := NewSender(azSender, nil)
g.Expect(sender.AzSender()).To(Equal(azSender))
}

type fakeAzSender struct {
SendMessageReceivedValue *azservicebus.Message
SendMessageReceivedCtx context.Context
SendMessageCalled bool
SendMessageErr error
SendMessageBatchCalled bool
SendMessageBatchErr error
NewMessageBatchReturnValue *azservicebus.MessageBatch
NewMessageBatchErr error
SendMessageBatchReceivedValue *azservicebus.MessageBatch
}

func (f *fakeAzSender) SendMessage(
ctx context.Context,
message *azservicebus.Message,
options *azservicebus.SendMessageOptions) error {
f.SendMessageCalled = true
f.SendMessageReceivedValue = message
f.SendMessageReceivedCtx = ctx
return f.SendMessageErr
}

func (f *fakeAzSender) SendMessageBatch(
ctx context.Context,
batch *azservicebus.MessageBatch,
options *azservicebus.SendMessageBatchOptions) error {
f.SendMessageBatchCalled = true
f.SendMessageBatchReceivedValue = batch
return f.SendMessageBatchErr
}

func (f *fakeAzSender) NewMessageBatch(
ctx context.Context,
options *azservicebus.MessageBatchOptions) (*azservicebus.MessageBatch, error) {
return f.NewMessageBatchReturnValue, f.NewMessageBatchErr
}
8 changes: 5 additions & 3 deletions v2/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"reflect"
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/go-shuttle/v2"
"github.com/Azure/go-shuttle/v2/otel"
. "github.com/onsi/gomega"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"go.opentelemetry.io/otel/propagation"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"

"github.com/Azure/go-shuttle/v2"
"github.com/Azure/go-shuttle/v2/otel"
)

func TestHandlers_SetMessageTrace(t *testing.T) {
Expand Down

0 comments on commit 723fb11

Please sign in to comment.