diff --git a/internal/test/test.go b/internal/test/test.go index 8c9f7d20..daebdb7a 100644 --- a/internal/test/test.go +++ b/internal/test/test.go @@ -176,5 +176,4 @@ func initTracing() (io.Closer, error) { return nil, err } return closer, nil - } diff --git a/v2/e2e/e2e_test.go b/v2/e2e/e2e_test.go new file mode 100644 index 00000000..92dc1574 --- /dev/null +++ b/v2/e2e/e2e_test.go @@ -0,0 +1,79 @@ +package e2e + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "github.com/stretchr/testify/require" + + v2 "github.com/Azure/go-shuttle/v2" +) + +// TestPublishAndListenWithConnectionStringUsingDefault tests both the publisher and listener with default configurations +func (s *SBSuite) TestPublishAndListen_ConcurrentLockRenewal() { + t := s.T() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + topicName := s.ApplyPrefix("default-topic") + subscriptionName := "sub" + s.EnsureTopic(ctx, t, topicName) + s.EnsureTopicSubscription(ctx, t, topicName, subscriptionName) + success := make(chan bool) + sendCount := 25 + go func() { + t.Logf("creating receiver...") + receiver, err := s.sbClient.NewReceiverForSubscription(topicName, subscriptionName, nil) + require.NoError(t, err) + lockRenewalInterval := 2 * time.Second + p := v2.NewProcessor(receiver, + v2.NewPanicHandler( + v2.NewRenewLockHandler(receiver, &lockRenewalInterval, + testHandler(t, success, sendCount))), &v2.ProcessorOptions{MaxConcurrency: 25}) + + t.Logf("start processor...") + err = p.Start(ctx) + t.Logf("processor exited: %s", err) + require.EqualError(t, err, context.DeadlineExceeded.Error()) + }() + + t.Logf("creating sender...") + sender, err := s.sbClient.NewSender(topicName, nil) + require.NoError(t, err) + t.Logf("sending message...") + for i := 0; i < sendCount; i++ { + err = sender.SendMessage(ctx, &azservicebus.Message{ + Body: []byte("{'value':'some message'}"), + }, nil) + require.NoError(t, err) + t.Logf("message %d sent...", i) + } + select { + case ok := <-success: + require.True(t, ok) + case <-ctx.Done(): + t.Errorf("did not complete the message in time") + } +} + +func testHandler(t *testing.T, success chan bool, expectedCount int) v2.HandlerFunc { + var count uint32 + return func(ctx context.Context, settler v2.MessageSettler, message *azservicebus.ReceivedMessage) { + t.Logf("Processing message.\n Delivery Count: %d\n", message.DeliveryCount) + t.Logf("ID: %s - Locked Until: %s\n", message.MessageID, message.LockedUntil) + t.Logf("sleeping...") + atomic.AddUint32(&count, 1) + time.Sleep(12 * time.Second) + t.Logf("completing message...") + err := settler.CompleteMessage(ctx, message, nil) + t.Logf("completed %d messages!", count) + if err != nil { + success <- false + } + if count == uint32(expectedCount) { + success <- true + } + } +} diff --git a/v2/e2e/suite_test.go b/v2/e2e/suite_test.go new file mode 100644 index 00000000..2e859ee5 --- /dev/null +++ b/v2/e2e/suite_test.go @@ -0,0 +1,178 @@ +package e2e + +import ( + "context" + "fmt" + "io" + "math/rand" + "os" + "syscall" + "testing" + "time" + + "github.com/Azure/azure-amqp-common-go/v3/conn" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + azadmin "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin" + "github.com/joho/godotenv" + "github.com/opentracing/opentracing-go" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/uber/jaeger-client-go/config" + jaegerlog "github.com/uber/jaeger-client-go/log" + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/multierr" +) + +var letterRunes = []rune("abcdefghijklmnopqrstuvwxyz123456789") + +type SBSuite struct { + suite.Suite + Prefix string + TenantID string + SubscriptionID string + ClientID string + ClientSecret string + ConnStr string + Location string + Namespace string + ResourceGroup string + TagID string + closer io.Closer + sbAdminClient *azadmin.Client + sbClient *azservicebus.Client +} + +func (s *SBSuite) GetSender(queueOrTopic string) (*azservicebus.Sender, error) { + // prefix the queue/topic + return s.sbClient.NewSender(queueOrTopic, nil) +} + +func init() { + rand.Seed(time.Now().Unix()) +} + +// randomString generates a random string with prefix +func randomString(prefix string, length int) string { + b := make([]rune, length) + for i := range b { + b[i] = letterRunes[rand.Intn(len(letterRunes))] + } + return prefix + string(b) +} + +func TestSuite(t *testing.T) { + suite.Run(t, &SBSuite{Prefix: "v5"}) +} + +func (s *SBSuite) InitFromEnv() error { + setFromEnv := func(key string, target *string) error { + v := os.Getenv(key) + if v == "" { + return fmt.Errorf("missing environment variable - %q required for integration tests", key) + } + *target = v + return nil + } + return multierr.Combine( + setFromEnv("AZURE_TENANT_ID", &s.TenantID), + setFromEnv("AZURE_SUBSCRIPTION_ID", &s.SubscriptionID), + setFromEnv("AZURE_CLIENT_ID", &s.ClientID), + setFromEnv("AZURE_CLIENT_SECRET", &s.ClientSecret), + setFromEnv("SERVICEBUS_CONNECTION_STRING", &s.ConnStr), + setFromEnv("TEST_RESOURCE_GROUP", &s.ResourceGroup), + setFromEnv("TEST_LOCATION", &s.Location)) +} + +func (s *SBSuite) SetupSuite() { + if err := godotenv.Load("../../.env"); err != nil { + s.T().Log(err) + } + s.T().Setenv("GOSHUTTLE_LOG", "ALL") + if os.Getenv("TRACING") == "1" { + _, err := initTracing() + if err != nil { + s.FailNow("failed to initialize tracing: %s", err) + } + } + err := s.InitFromEnv() + s.Require().NoErrorf(err, "Missing env variable to configure suite") + + parsed, err := conn.ParsedConnectionFromStr(s.ConnStr) + s.Require().NoErrorf(err, "connection string could not be parsed") + s.Namespace = parsed.Namespace + // suite.Token = suite.servicePrincipalToken() + s.TagID = randomString("tag", 5) + s.sbClient, err = azservicebus.NewClientFromConnectionString(s.ConnStr, nil) + s.Require().NoError(err) + s.sbAdminClient, err = azadmin.NewClientFromConnectionString(s.ConnStr, nil) + s.Require().NoError(err) +} + +func (s *SBSuite) ApplyPrefix(name string) string { + return fmt.Sprintf("%s-%s", s.Prefix, name) +} + +func (s *SBSuite) EnsureTopic(ctx context.Context, t *testing.T, name string) { + topic, err := s.sbAdminClient.GetTopic(ctx, name, nil) + require.NoError(t, err) + if topic == nil { + createResponse, err := s.sbAdminClient.CreateTopic(ctx, name, &azadmin.CreateTopicOptions{ + Properties: &azadmin.TopicProperties{}, + }) + require.NoError(t, err) + t.Logf("topic created: %v", createResponse.Status) + return + } + updateResponse, err := s.sbAdminClient.UpdateTopic(ctx, name, azadmin.TopicProperties{}, nil) + require.NoError(t, err) + t.Logf("topic updated: %v", updateResponse.Status) +} + +func (s *SBSuite) EnsureTopicSubscription(ctx context.Context, t *testing.T, topicName, subscriptionName string) { + sub, err := s.sbAdminClient.GetSubscription(ctx, topicName, subscriptionName, nil) + require.NoError(t, err) + if sub == nil { + createResponse, err := s.sbAdminClient.CreateSubscription(ctx, topicName, subscriptionName, &azadmin.CreateSubscriptionOptions{ + Properties: &azadmin.SubscriptionProperties{ + LockDuration: to.Ptr("PT10S"), + }, + }) + require.NoError(t, err) + t.Logf("subscription created: %v", createResponse.Status) + return + } + updateResponse, err := s.sbAdminClient.UpdateSubscription(ctx, topicName, subscriptionName, azadmin.SubscriptionProperties{ + LockDuration: to.Ptr("PT10S"), + }, nil) + require.NoError(t, err) + t.Logf("subscription updated: %v", updateResponse.Status) +} + +func (s *SBSuite) TearDownSuite() { + p, _ := os.FindProcess(syscall.Getpid()) + p.Signal(syscall.SIGINT) +} + +func (s *SBSuite) SetupTest() { + +} + +func (s *SBSuite) TearDownTest() { + +} + +func initTracing() (io.Closer, error) { + cfg, err := config.FromEnv() + if err != nil { + return nil, err + } + jLogger := jaegerlog.NullLogger + jMetricsFactory := metrics.NullFactory + tracer, closer, err := cfg.NewTracer(config.Logger(jLogger), config.Metrics(jMetricsFactory)) + opentracing.SetGlobalTracer(tracer) + if err != nil { + return nil, err + } + return closer, nil +} diff --git a/v2/go.mod b/v2/go.mod index 91615c6f..0b0a11ff 100644 --- a/v2/go.mod +++ b/v2/go.mod @@ -3,24 +3,37 @@ go 1.18 module github.com/Azure/go-shuttle/v2 require ( + github.com/Azure/azure-amqp-common-go/v3 v3.2.0 github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.3 + github.com/joho/godotenv v1.3.0 + github.com/opentracing/opentracing-go v1.2.0 github.com/stretchr/testify v1.7.0 + github.com/uber/jaeger-client-go v2.25.0+incompatible + github.com/uber/jaeger-lib v2.4.0+incompatible + go.uber.org/multierr v1.8.0 ) require ( github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect + github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang-jwt/jwt v3.2.1+incompatible // indirect github.com/google/uuid v1.1.1 // indirect github.com/kylelemons/godebug v1.1.0 // indirect + github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/objx v0.1.1 // indirect + go.uber.org/atomic v1.7.0 // indirect golang.org/x/crypto v0.0.0-20220511200225-c6db032c6c88 // indirect - golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect - golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect - golang.org/x/text v0.3.7 // indirect - gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect + golang.org/x/net v0.2.0 // indirect + golang.org/x/sys v0.2.0 // indirect + golang.org/x/text v0.4.0 // indirect + gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + nhooyr.io/websocket v1.8.7 // indirect ) diff --git a/v2/go.sum b/v2/go.sum index 82069e70..56332674 100644 --- a/v2/go.sum +++ b/v2/go.sum @@ -1,3 +1,6 @@ +github.com/Azure/azure-amqp-common-go/v3 v3.2.0 h1:BK/3P4TW4z2HLD6G5tMlHRvptOxxi4s9ee5r8sdHBUs= +github.com/Azure/azure-amqp-common-go/v3 v3.2.0/go.mod h1:zN7QL/vfCsq3XQxQaTkg4ScO786CA2rQnZ1LXX7QryE= +github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 h1:sVPhtT2qjO86rTUaWMr4WoES4TkjGnzcioXcnHV9s5k= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 h1:Yoicul8bnVdQrhDMTHxdEckRGX01XvwXDHUT9zYZ3k0= @@ -6,42 +9,123 @@ github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5 github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.3 h1:27HVgIcvrKkRs5eJzHnyZdt71/EyB3clkiJQB0qyIa8= github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.3/go.mod h1:Eo6WMP/iw9sp06+v8y030eReUwX6sULn5i3fxCDWPag= +github.com/Azure/go-amqp v0.13.13/go.mod h1:D5ZrjQqB1dyp1A+G73xeL/kNn7D5qHJIIsNNps7YNmk= +github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= +github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA= +github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M= +github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74= +github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k= +github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE= +github.com/Azure/go-autorest/autorest/validation v0.3.1/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E= +github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8= +github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c= github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= +github.com/HdrHistogram/hdrhistogram-go v1.0.1 h1:GX8GAYDuhlFQnI2fRDHQhTlkHMz8bEn0jTI6LJU0mpw= +github.com/HdrHistogram/hdrhistogram-go v1.0.1/go.mod h1:BWJ+nMSHY3L41Zj7CA3uXnloDp7xxV0YvstAE7nKTaM= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY= github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c= +github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= +github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c= github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-jwt/jwt/v4 v4.2.0 h1:besgBTC8w8HjP6NzQdxwKH9Z5oQMZ24ThTrHp3cZ8eU= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= +github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= +github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eTO0Q8= +github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= +github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI= github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24szfsn/3LvK9QHCq9oQw8+U= +github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= +github.com/uber/jaeger-lib v2.4.0+incompatible h1:fY7QsGQWiCt8pajv4r7JEvmATdCVaWxXbjwyYwsNaLQ= +github.com/uber/jaeger-lib v2.4.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= +github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= +github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= +go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20220511200225-c6db032c6c88 h1:Tgea0cVUD0ivh5ADBX4WwuI12DUd2to3nCYe2eayMIw= golang.org/x/crypto v0.0.0-20220511200225-c6db032c6c88/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 h1:HVyaeDAYux4pnY+D/SiwmLOR36ewZ4iGQIIrtnuCjFA= -golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.2.0 h1:sZfSu1wtKLGlWI4ZZayP0ck9Y73K1ynO6gqzTdBVdPU= +golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= -golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A= +golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg= +golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -nhooyr.io/websocket v1.8.6 h1:s+C3xAMLwGmlI31Nyn/eAehUlZPwfYZu2JXM621Q5/k= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= +nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= diff --git a/v2/processor.go b/v2/processor.go index d134fd6c..42a2348c 100644 --- a/v2/processor.go +++ b/v2/processor.go @@ -3,6 +3,8 @@ package v2 import ( "context" "errors" + "fmt" + "os" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" @@ -44,10 +46,12 @@ type Processor struct { // MaxConcurrency defaults to 1. Not setting MaxConcurrency, or setting it to 0 or a negative value will fallback to the default. // ReceiveInterval defaults to 2 seconds if not set. type ProcessorOptions struct { - MaxConcurrency int - ReceiveInterval *time.Duration + MaxConcurrency int + ReceiveInterval *time.Duration + EnrichContextFunc func(ctx context.Context, message *azservicebus.ReceivedMessage) } + func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOptions) *Processor { opts := ProcessorOptions{ MaxConcurrency: 1, @@ -72,6 +76,7 @@ func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOpti // Start starts the processor and blocks until an error occurs or the context is canceled. func (p *Processor) Start(ctx context.Context) error { messages, err := p.receiver.ReceiveMessages(ctx, p.options.MaxConcurrency, nil) + log("received ", len(messages), " messages - initial") if err != nil { return err } @@ -82,7 +87,11 @@ func (p *Processor) Start(ctx context.Context) error { select { case <-time.After(*p.options.ReceiveInterval): maxMessages := p.options.MaxConcurrency - len(p.concurrencyTokens) + if ctx.Err() != nil || maxMessages == 0 { + break + } messages, err := p.receiver.ReceiveMessages(ctx, maxMessages, nil) + log("received ", len(messages), " messages from loop") if err != nil { return err } @@ -90,20 +99,22 @@ func (p *Processor) Start(ctx context.Context) error { p.process(ctx, msg) } case <-ctx.Done(): + log("context done, stop receiving") break } } + log("exiting processor") return ctx.Err() } func (p *Processor) process(ctx context.Context, message *azservicebus.ReceivedMessage) { p.concurrencyTokens <- struct{}{} go func() { + msgContext, cancel := context.WithCancel(ctx) + defer cancel() defer func() { <-p.concurrencyTokens }() - msgContext, cancel := context.WithCancel(ctx) - defer cancel() p.handle(msgContext, p.receiver, message) }() } @@ -147,10 +158,12 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a for alive := true; alive; { select { case <-time.After(*plr.renewalInterval): + log("renewing lock") count++ // tab.For(ctx).Debug("Renewing message lock", tab.Int64Attribute("count", int64(count))) err := plr.lockRenewer.RenewMessageLock(ctx, message, nil) if err != nil { + log("ERROR failed to renew lock") // listener.Metrics.IncMessageLockRenewedFailure(message) // I don't think this is a problem. the context is canceled when the message processing is over. // this can happen if we already entered the interval case when the message is completing. @@ -160,6 +173,7 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a // tab.For(ctx).Debug("renewed lock success") // listener.Metrics.IncMessageLockRenewedSuccess(message) case <-ctx.Done(): + log("Context Done, stop lock renewal") // tab.For(ctx).Info("Stopping periodic renewal") err := ctx.Err() if errors.Is(err, context.DeadlineExceeded) { @@ -169,3 +183,10 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a } } } + +// dumb log until we integrate logging +func log(a ...any) { + if os.Getenv("GOSHUTTLE_LOG") == "ALL" { + fmt.Println(append(append([]any{}, time.Now().UTC(), " - "), a...)...) + } +} diff --git a/v2/processor_fake_test.go b/v2/processor_fake_test.go index 3448abe0..a6bac072 100644 --- a/v2/processor_fake_test.go +++ b/v2/processor_fake_test.go @@ -75,17 +75,3 @@ func (f *fakeReceiver) ReceiveMessages(_ context.Context, maxMessages int, _ *az return result, f.SetupReceiveError } - -func messagesChannel(messageCount int) chan *azservicebus.ReceivedMessage { - messages := make(chan *azservicebus.ReceivedMessage, messageCount) - for i := 0; i < messageCount; i++ { - messages <- &azservicebus.ReceivedMessage{} - } - return messages -} - -func enqueueCount(q chan *azservicebus.ReceivedMessage, messageCount int) { - for i := 0; i < messageCount; i++ { - q <- &azservicebus.ReceivedMessage{} - } -} diff --git a/v2/processor_test.go b/v2/processor_test.go index e96ad4bf..579187d0 100644 --- a/v2/processor_test.go +++ b/v2/processor_test.go @@ -134,18 +134,18 @@ func TestNewProcessor_ReceiveDeltaConcurrencyOnly(t *testing.T) { a.Error(err, "expect to exit with error because we consumed all configured messages") a.Equal(3, len(rcv.ReceiveCalls), "there should be 4 entry in the ReceiveCalls array") a.Equal(1, rcv.ReceiveCalls[0], "the processor should have used max concurrency of 1 initially") - a.Equal(0, rcv.ReceiveCalls[1], "the processor has no go-routine available") + a.Equal(1, rcv.ReceiveCalls[1], "the processor should receive 1 when the previous message is done processing and exit") a.Equal(1, rcv.ReceiveCalls[2], "the processor should receive 1 when the previous message is done processing and exit") } func TestNewProcessor_ReceiveDelta(t *testing.T) { // with an message processing that takes 10ms and an interval polling every 20 ms, - // we should call receive exactly 3 times to consume all the messages. + // we should call receive exactly 2 times to consume all the messages. a := require.New(t) rcv := &fakeReceiver{ fakeSettler: &fakeSettler{}, SetupReceivedMessages: messagesChannel(5), - SetupMaxReceiveCalls: 3, + SetupMaxReceiveCalls: 2, } processor := v2.NewProcessor(rcv, MyHandler(1*time.Second), &v2.ProcessorOptions{ MaxConcurrency: 10, @@ -166,8 +166,21 @@ func TestNewProcessor_ReceiveDelta(t *testing.T) { close(rcv.SetupReceivedMessages) <-done a.Error(processorError, "expect to exit with error because we consumed all configured messages") - a.Equal(3, len(rcv.ReceiveCalls), "should be called 3 times") + a.Equal(2, len(rcv.ReceiveCalls), "should be called 3 times") a.Equal(10, rcv.ReceiveCalls[0], "the processor should have used max concurrency of 10 initially") a.Equal(5, rcv.ReceiveCalls[1], "the processor should request 5 (delta)") - a.Equal(0, rcv.ReceiveCalls[2], "the processor should request 0 (delta)") +} + +func messagesChannel(messageCount int) chan *azservicebus.ReceivedMessage { + messages := make(chan *azservicebus.ReceivedMessage, messageCount) + for i := 0; i < messageCount; i++ { + messages <- &azservicebus.ReceivedMessage{} + } + return messages +} + +func enqueueCount(q chan *azservicebus.ReceivedMessage, messageCount int) { + for i := 0; i < messageCount; i++ { + q <- &azservicebus.ReceivedMessage{} + } }