Skip to content

Commit

Permalink
fix: respecting context cancelation since sdk does not (#153)
Browse files Browse the repository at this point in the history
* fix: respecting context cancelation since sdk does not
* fix: typo in error wrap
* adding missing comment
  • Loading branch information
imiller31 authored Aug 17, 2023
1 parent d38b80f commit b1eea4a
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 7 deletions.
36 changes: 32 additions & 4 deletions v2/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,23 @@ func (d *Sender) SendMessage(ctx context.Context, mb MessageBody, options ...fun
ctx, cancel = context.WithTimeout(ctx, d.options.SendTimeout)
defer cancel()
}
if err := d.sbSender.SendMessage(ctx, msg, nil); err != nil { // sendMessageOptions currently does nothing
return fmt.Errorf("failed to send message: %w", err)

errChan := make(chan error)

go func() {
if err := d.sbSender.SendMessage(ctx, msg, nil); err != nil { // sendMessageOptions currently does nothing
errChan <- fmt.Errorf("failed to send message: %w", err)
}
errChan <- nil
}()

select {
case <-ctx.Done():
return fmt.Errorf("failed to send message: %w", ctx.Err())
case err := <-errChan:
return err
}
return nil

}

// ToServiceBusMessage transform a MessageBody into an azservicebus.Message.
Expand Down Expand Up @@ -120,7 +133,22 @@ func (d *Sender) SendMessageBatch(ctx context.Context, messages []*azservicebus.
return fmt.Errorf("failed to send message batch: %w", err)
}

return nil
errChan := make(chan error)

go func() {
if err := d.sbSender.SendMessageBatch(ctx, batch, nil); err != nil {
errChan <- fmt.Errorf("failed to send message batch: %w", err)
}
errChan <- nil
}()

select {
case <-ctx.Done():
return fmt.Errorf("failed to send message batch: %w", ctx.Err())
case err := <-errChan:
return err
}

}

// AzSender returns the underlying azservicebus.Sender instance.
Expand Down
29 changes: 26 additions & 3 deletions v2/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ import (
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"go.opentelemetry.io/otel/sdk/trace"

. "github.com/onsi/gomega"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
. "github.com/onsi/gomega"
)

func TestFunc_NewSender(t *testing.T) {
Expand Down Expand Up @@ -155,6 +154,30 @@ func TestSender_WithSendTimeout(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())
}

func TestSender_WithContextCanceled(t *testing.T) {
g := NewWithT(t)
sendTimeout := 1 * time.Second
azSender := &fakeAzSender{
DoSendMessage: func(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error {
time.Sleep(2 * time.Second)
return nil
},
DoSendMessageBatch: func(ctx context.Context, messages *azservicebus.MessageBatch, options *azservicebus.SendMessageBatchOptions) error {
time.Sleep(2 * time.Second)
return nil
},
}
sender := NewSender(azSender, &SenderOptions{
Marshaller: &DefaultJSONMarshaller{},
SendTimeout: sendTimeout,
})

err := sender.SendMessage(context.Background(), "test")
g.Expect(err).To(MatchError(context.DeadlineExceeded))
err = sender.SendMessageBatch(context.Background(), nil)
g.Expect(err).To(MatchError(context.DeadlineExceeded))
}

func TestSender_DisabledSendTimeout(t *testing.T) {
g := NewWithT(t)
sendTimeout := -1 * time.Second
Expand Down

0 comments on commit b1eea4a

Please sign in to comment.