From 723fb11ca8d06d90dba5b7382a120c39799af9e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Erbrech?= Date: Tue, 4 Apr 2023 21:54:23 +1000 Subject: [PATCH] feat: Expose batch operations (#122) * expose batch operations * add tests * add more doc comments * fix imports --- v2/sender.go | 50 ++++++++++++++++++-- v2/sender_test.go | 114 +++++++++++++++++++++++++++++++++++++++++++-- v2/tracing_test.go | 8 ++-- 3 files changed, 161 insertions(+), 11 deletions(-) diff --git a/v2/sender.go b/v2/sender.go index cf1ac9a1..6cc5fdf8 100644 --- a/v2/sender.go +++ b/v2/sender.go @@ -19,6 +19,8 @@ type MessageBody any // AzServiceBusSender is satisfied by *azservicebus.Sender type AzServiceBusSender interface { SendMessage(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error + SendMessageBatch(ctx context.Context, batch *azservicebus.MessageBatch, options *azservicebus.SendMessageBatchOptions) error + NewMessageBatch(ctx context.Context, options *azservicebus.MessageBatchOptions) (*azservicebus.MessageBatch, error) } // Sender contains an SBSender used to send the message to the ServiceBus queue and a Marshaller used to marshal any struct into a ServiceBus message @@ -43,13 +45,33 @@ func NewSender(sender AzServiceBusSender, options *SenderOptions) *Sender { return &Sender{sbSender: sender, options: options} } +// SendMessage sends a payload on the bus. +// the MessageBody is marshalled and set as the message body. func (d *Sender) SendMessage(ctx context.Context, mb MessageBody, options ...func(msg *azservicebus.Message) error) error { + msg, err := d.ToServiceBusMessage(ctx, mb, options...) + if err != nil { + return err + } + if err := d.sbSender.SendMessage(ctx, msg, nil); err != nil { // sendMessageOptions currently does nothing + return fmt.Errorf("failed to send message: %w", err) + } + return nil +} + +// ToServiceBusMessage transform a MessageBody into an azservicebus.Message. +// It marshals the body using the sender's configured marshaller, +// and set the bytes as the message.Body. +// the sender's configured options are applied to the azservicebus.Message before +// returning it. +func (d *Sender) ToServiceBusMessage( + ctx context.Context, + mb MessageBody, + options ...func(msg *azservicebus.Message) error) (*azservicebus.Message, error) { // uses a marshaller to marshal the message into a service bus message msg, err := d.options.Marshaller.Marshal(mb) if err != nil { - return fmt.Errorf("failed to marshal original struct into ServiceBus message: %w", err) + return nil, fmt.Errorf("failed to marshal original struct into ServiceBus message: %w", err) } - msgType := getMessageType(mb) msg.ApplicationProperties = map[string]interface{}{msgTypeField: msgType} @@ -59,17 +81,35 @@ func (d *Sender) SendMessage(ctx context.Context, mb MessageBody, options ...fun for _, option := range options { if err := option(msg); err != nil { - return fmt.Errorf("failed to run message options: %w", err) + return nil, fmt.Errorf("failed to run message options: %w", err) } } + return msg, nil +} - if err := d.sbSender.SendMessage(ctx, msg, nil); err != nil { // sendMessageOptions currently does nothing - return fmt.Errorf("failed to send message: %w", err) +// SendMessageBatch sends the array of azservicebus messages as a batch. +func (d *Sender) SendMessageBatch(ctx context.Context, messages []*azservicebus.Message) error { + batch, err := d.sbSender.NewMessageBatch(ctx, &azservicebus.MessageBatchOptions{}) + if err != nil { + return err + } + for _, msg := range messages { + if err := batch.AddMessage(msg, nil); err != nil { + return err + } + } + if err := d.sbSender.SendMessageBatch(ctx, batch, nil); err != nil { + return fmt.Errorf("failed to send message batch: %w", err) } return nil } +// AzSender returns the underlying azservicebus.Sender instance. +func (d *Sender) AzSender() AzServiceBusSender { + return d.sbSender +} + // SetMessageId sets the ServiceBus message's ID to a user-specified value func SetMessageId(messageId *string) func(msg *azservicebus.Message) error { return func(msg *azservicebus.Message) error { diff --git a/v2/sender_test.go b/v2/sender_test.go index 1463273d..b734a80a 100644 --- a/v2/sender_test.go +++ b/v2/sender_test.go @@ -1,10 +1,17 @@ package shuttle import ( + "context" + "fmt" "reflect" "testing" "time" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "go.opentelemetry.io/otel/sdk/trace" + + . "github.com/onsi/gomega" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" ) @@ -55,14 +62,115 @@ func TestHandlers_SetScheduleAt(t *testing.T) { } } +func TestHandlers_SetMessageDelay(t *testing.T) { + blankMsg := &azservicebus.Message{} + g := NewWithT(t) + option := SetMessageDelay(1 * time.Minute) + if err := option(blankMsg); err != nil { + t.Errorf("Unexpected error in set schedule at test: %s", err) + } + g.Expect(*blankMsg.ScheduledEnqueueTime).To(BeTemporally("~", time.Now().Add(1*time.Minute), time.Second)) +} + func TestHandlers_SetMessageTTL(t *testing.T) { blankMsg := &azservicebus.Message{} - ttl := time.Duration(10 * time.Second) - handler := SetMessageTTL(ttl) - if err := handler(blankMsg); err != nil { + ttl := 10 * time.Second + option := SetMessageTTL(ttl) + if err := option(blankMsg); err != nil { t.Errorf("Unexpected error in set message TTL at test: %s", err) } if *blankMsg.TimeToLive != ttl { t.Errorf("for message TTL at expected %s, got %s", ttl, *blankMsg.TimeToLive) } } + +func TestSender_SenderTracePropagation(t *testing.T) { + g := NewWithT(t) + azSender := &fakeAzSender{} + sender := NewSender(azSender, &SenderOptions{ + EnableTracingPropagation: true, + Marshaller: &DefaultJSONMarshaller{}, + }) + + tp := trace.NewTracerProvider(trace.WithSampler(trace.AlwaysSample())) + ctx, span := tp.Tracer("testTracer").Start( + context.Background(), + "receiver.Handle") + + msg, err := sender.ToServiceBusMessage(ctx, "test") + span.End() + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(msg.ApplicationProperties["traceparent"]).ToNot(BeNil()) +} + +func TestSender_SendMessage(t *testing.T) { + azSender := &fakeAzSender{} + sender := NewSender(azSender, nil) + err := sender.SendMessage(context.Background(), "test", SetMessageId(to.Ptr("messageID"))) + g := NewWithT(t) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(azSender.SendMessageCalled).To(BeTrue()) + g.Expect(string(azSender.SendMessageReceivedValue.Body)).To(Equal("\"test\"")) + g.Expect(*azSender.SendMessageReceivedValue.MessageID).To(Equal("messageID")) + + azSender.SendMessageErr = fmt.Errorf("msg send failure") + err = sender.SendMessage(context.Background(), "test") + g.Expect(err).To(And(HaveOccurred(), MatchError(azSender.SendMessageErr))) +} + +func TestSender_SendMessageBatch(t *testing.T) { + g := NewWithT(t) + azSender := &fakeAzSender{ + NewMessageBatchReturnValue: &azservicebus.MessageBatch{}, + } + sender := NewSender(azSender, nil) + msg, err := sender.ToServiceBusMessage(context.Background(), "test") + g.Expect(err).ToNot(HaveOccurred()) + err = sender.SendMessageBatch(context.Background(), []*azservicebus.Message{msg}) + g.Expect(err).To(HaveOccurred()) + // No way to create a MessageBatch struct with a non-0 max bytes in test, so the best we can do is expect an error. +} + +func TestSender_AzSender(t *testing.T) { + g := NewWithT(t) + azSender := &fakeAzSender{} + sender := NewSender(azSender, nil) + g.Expect(sender.AzSender()).To(Equal(azSender)) +} + +type fakeAzSender struct { + SendMessageReceivedValue *azservicebus.Message + SendMessageReceivedCtx context.Context + SendMessageCalled bool + SendMessageErr error + SendMessageBatchCalled bool + SendMessageBatchErr error + NewMessageBatchReturnValue *azservicebus.MessageBatch + NewMessageBatchErr error + SendMessageBatchReceivedValue *azservicebus.MessageBatch +} + +func (f *fakeAzSender) SendMessage( + ctx context.Context, + message *azservicebus.Message, + options *azservicebus.SendMessageOptions) error { + f.SendMessageCalled = true + f.SendMessageReceivedValue = message + f.SendMessageReceivedCtx = ctx + return f.SendMessageErr +} + +func (f *fakeAzSender) SendMessageBatch( + ctx context.Context, + batch *azservicebus.MessageBatch, + options *azservicebus.SendMessageBatchOptions) error { + f.SendMessageBatchCalled = true + f.SendMessageBatchReceivedValue = batch + return f.SendMessageBatchErr +} + +func (f *fakeAzSender) NewMessageBatch( + ctx context.Context, + options *azservicebus.MessageBatchOptions) (*azservicebus.MessageBatch, error) { + return f.NewMessageBatchReturnValue, f.NewMessageBatchErr +} diff --git a/v2/tracing_test.go b/v2/tracing_test.go index 2bbca783..ae75ad58 100644 --- a/v2/tracing_test.go +++ b/v2/tracing_test.go @@ -5,13 +5,15 @@ import ( "reflect" "testing" - "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" - "github.com/Azure/go-shuttle/v2" - "github.com/Azure/go-shuttle/v2/otel" . "github.com/onsi/gomega" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "go.opentelemetry.io/otel/propagation" tracesdk "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" + + "github.com/Azure/go-shuttle/v2" + "github.com/Azure/go-shuttle/v2/otel" ) func TestHandlers_SetMessageTrace(t *testing.T) {