Skip to content

Commit

Permalink
v2 e2e base (#88)
Browse files Browse the repository at this point in the history
* timer pump

* set options correctly

* go mod tidy

* add few hairy unit tests

* move fakes to separate file

* improve e2e setup and test

* fix unit test after avoiding calls to Receive with 0 max message (deadlock)
  • Loading branch information
serbrech authored Dec 9, 2022
1 parent d3d280a commit 7497093
Show file tree
Hide file tree
Showing 8 changed files with 410 additions and 37 deletions.
1 change: 0 additions & 1 deletion internal/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,5 +176,4 @@ func initTracing() (io.Closer, error) {
return nil, err
}
return closer, nil

}
79 changes: 79 additions & 0 deletions v2/e2e/e2e_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package e2e

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/stretchr/testify/require"

v2 "github.com/Azure/go-shuttle/v2"
)

// TestPublishAndListenWithConnectionStringUsingDefault tests both the publisher and listener with default configurations
func (s *SBSuite) TestPublishAndListen_ConcurrentLockRenewal() {
t := s.T()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
topicName := s.ApplyPrefix("default-topic")
subscriptionName := "sub"
s.EnsureTopic(ctx, t, topicName)
s.EnsureTopicSubscription(ctx, t, topicName, subscriptionName)
success := make(chan bool)
sendCount := 25
go func() {
t.Logf("creating receiver...")
receiver, err := s.sbClient.NewReceiverForSubscription(topicName, subscriptionName, nil)
require.NoError(t, err)
lockRenewalInterval := 2 * time.Second
p := v2.NewProcessor(receiver,
v2.NewPanicHandler(
v2.NewRenewLockHandler(receiver, &lockRenewalInterval,
testHandler(t, success, sendCount))), &v2.ProcessorOptions{MaxConcurrency: 25})

t.Logf("start processor...")
err = p.Start(ctx)
t.Logf("processor exited: %s", err)
require.EqualError(t, err, context.DeadlineExceeded.Error())
}()

t.Logf("creating sender...")
sender, err := s.sbClient.NewSender(topicName, nil)
require.NoError(t, err)
t.Logf("sending message...")
for i := 0; i < sendCount; i++ {
err = sender.SendMessage(ctx, &azservicebus.Message{
Body: []byte("{'value':'some message'}"),
}, nil)
require.NoError(t, err)
t.Logf("message %d sent...", i)
}
select {
case ok := <-success:
require.True(t, ok)
case <-ctx.Done():
t.Errorf("did not complete the message in time")
}
}

func testHandler(t *testing.T, success chan bool, expectedCount int) v2.HandlerFunc {
var count uint32
return func(ctx context.Context, settler v2.MessageSettler, message *azservicebus.ReceivedMessage) {
t.Logf("Processing message.\n Delivery Count: %d\n", message.DeliveryCount)
t.Logf("ID: %s - Locked Until: %s\n", message.MessageID, message.LockedUntil)
t.Logf("sleeping...")
atomic.AddUint32(&count, 1)
time.Sleep(12 * time.Second)
t.Logf("completing message...")
err := settler.CompleteMessage(ctx, message, nil)
t.Logf("completed %d messages!", count)
if err != nil {
success <- false
}
if count == uint32(expectedCount) {
success <- true
}
}
}
178 changes: 178 additions & 0 deletions v2/e2e/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package e2e

import (
"context"
"fmt"
"io"
"math/rand"
"os"
"syscall"
"testing"
"time"

"github.com/Azure/azure-amqp-common-go/v3/conn"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
azadmin "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
"github.com/joho/godotenv"
"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber/jaeger-client-go/config"
jaegerlog "github.com/uber/jaeger-client-go/log"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/multierr"
)

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyz123456789")

type SBSuite struct {
suite.Suite
Prefix string
TenantID string
SubscriptionID string
ClientID string
ClientSecret string
ConnStr string
Location string
Namespace string
ResourceGroup string
TagID string
closer io.Closer
sbAdminClient *azadmin.Client
sbClient *azservicebus.Client
}

func (s *SBSuite) GetSender(queueOrTopic string) (*azservicebus.Sender, error) {
// prefix the queue/topic
return s.sbClient.NewSender(queueOrTopic, nil)
}

func init() {
rand.Seed(time.Now().Unix())
}

// randomString generates a random string with prefix
func randomString(prefix string, length int) string {
b := make([]rune, length)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return prefix + string(b)
}

func TestSuite(t *testing.T) {
suite.Run(t, &SBSuite{Prefix: "v5"})
}

func (s *SBSuite) InitFromEnv() error {
setFromEnv := func(key string, target *string) error {
v := os.Getenv(key)
if v == "" {
return fmt.Errorf("missing environment variable - %q required for integration tests", key)
}
*target = v
return nil
}
return multierr.Combine(
setFromEnv("AZURE_TENANT_ID", &s.TenantID),
setFromEnv("AZURE_SUBSCRIPTION_ID", &s.SubscriptionID),
setFromEnv("AZURE_CLIENT_ID", &s.ClientID),
setFromEnv("AZURE_CLIENT_SECRET", &s.ClientSecret),
setFromEnv("SERVICEBUS_CONNECTION_STRING", &s.ConnStr),
setFromEnv("TEST_RESOURCE_GROUP", &s.ResourceGroup),
setFromEnv("TEST_LOCATION", &s.Location))
}

func (s *SBSuite) SetupSuite() {
if err := godotenv.Load("../../.env"); err != nil {
s.T().Log(err)
}
s.T().Setenv("GOSHUTTLE_LOG", "ALL")
if os.Getenv("TRACING") == "1" {
_, err := initTracing()
if err != nil {
s.FailNow("failed to initialize tracing: %s", err)
}
}
err := s.InitFromEnv()
s.Require().NoErrorf(err, "Missing env variable to configure suite")

parsed, err := conn.ParsedConnectionFromStr(s.ConnStr)
s.Require().NoErrorf(err, "connection string could not be parsed")
s.Namespace = parsed.Namespace
// suite.Token = suite.servicePrincipalToken()
s.TagID = randomString("tag", 5)
s.sbClient, err = azservicebus.NewClientFromConnectionString(s.ConnStr, nil)
s.Require().NoError(err)
s.sbAdminClient, err = azadmin.NewClientFromConnectionString(s.ConnStr, nil)
s.Require().NoError(err)
}

func (s *SBSuite) ApplyPrefix(name string) string {
return fmt.Sprintf("%s-%s", s.Prefix, name)
}

func (s *SBSuite) EnsureTopic(ctx context.Context, t *testing.T, name string) {
topic, err := s.sbAdminClient.GetTopic(ctx, name, nil)
require.NoError(t, err)
if topic == nil {
createResponse, err := s.sbAdminClient.CreateTopic(ctx, name, &azadmin.CreateTopicOptions{
Properties: &azadmin.TopicProperties{},
})
require.NoError(t, err)
t.Logf("topic created: %v", createResponse.Status)
return
}
updateResponse, err := s.sbAdminClient.UpdateTopic(ctx, name, azadmin.TopicProperties{}, nil)
require.NoError(t, err)
t.Logf("topic updated: %v", updateResponse.Status)
}

func (s *SBSuite) EnsureTopicSubscription(ctx context.Context, t *testing.T, topicName, subscriptionName string) {
sub, err := s.sbAdminClient.GetSubscription(ctx, topicName, subscriptionName, nil)
require.NoError(t, err)
if sub == nil {
createResponse, err := s.sbAdminClient.CreateSubscription(ctx, topicName, subscriptionName, &azadmin.CreateSubscriptionOptions{
Properties: &azadmin.SubscriptionProperties{
LockDuration: to.Ptr("PT10S"),
},
})
require.NoError(t, err)
t.Logf("subscription created: %v", createResponse.Status)
return
}
updateResponse, err := s.sbAdminClient.UpdateSubscription(ctx, topicName, subscriptionName, azadmin.SubscriptionProperties{
LockDuration: to.Ptr("PT10S"),
}, nil)
require.NoError(t, err)
t.Logf("subscription updated: %v", updateResponse.Status)
}

func (s *SBSuite) TearDownSuite() {
p, _ := os.FindProcess(syscall.Getpid())
p.Signal(syscall.SIGINT)
}

func (s *SBSuite) SetupTest() {

}

func (s *SBSuite) TearDownTest() {

}

func initTracing() (io.Closer, error) {
cfg, err := config.FromEnv()
if err != nil {
return nil, err
}
jLogger := jaegerlog.NullLogger
jMetricsFactory := metrics.NullFactory
tracer, closer, err := cfg.NewTracer(config.Logger(jLogger), config.Metrics(jMetricsFactory))
opentracing.SetGlobalTracer(tracer)
if err != nil {
return nil, err
}
return closer, nil
}
21 changes: 17 additions & 4 deletions v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,37 @@ go 1.18
module github.com/Azure/go-shuttle/v2

require (
github.com/Azure/azure-amqp-common-go/v3 v3.2.0
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.3
github.com/joho/godotenv v1.3.0
github.com/opentracing/opentracing-go v1.2.0
github.com/stretchr/testify v1.7.0
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/uber/jaeger-lib v2.4.0+incompatible
go.uber.org/multierr v1.8.0
)

require (
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang-jwt/jwt v3.2.1+incompatible // indirect
github.com/google/uuid v1.1.1 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.1.1 // indirect
go.uber.org/atomic v1.7.0 // indirect
golang.org/x/crypto v0.0.0-20220511200225-c6db032c6c88 // indirect
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
golang.org/x/net v0.2.0 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/text v0.4.0 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
nhooyr.io/websocket v1.8.7 // indirect
)
Loading

0 comments on commit 7497093

Please sign in to comment.