diff --git a/fxgcppubsub/module_test.go b/fxgcppubsub/module_test.go index b54784b..7c8834d 100644 --- a/fxgcppubsub/module_test.go +++ b/fxgcppubsub/module_test.go @@ -65,13 +65,8 @@ func TestFxGcpPubSubModule(t *testing.T) { fx.Populate(&publisher, &subscriber, &supervisor), ).RequireStart().RequireStop() - t.Run("raw message", func(t *testing.T) { - res, err := publisher.Publish(ctx, "raw-topic", []byte("test")) - assert.NotNil(t, res) - assert.NoError(t, err) - - sid, err := res.Get(ctx) - assert.NotEmpty(t, sid) + t.Run("raw message ack", func(t *testing.T) { + _, err := publisher.Publish(ctx, "raw-topic", []byte("test")) assert.NoError(t, err) publisher.Stop() @@ -89,17 +84,31 @@ func TestFxGcpPubSubModule(t *testing.T) { assert.NoError(t, err) }) - t.Run("avro message", func(t *testing.T) { - res, err := publisher.Publish(ctx, "avro-topic", &avro.SimpleRecord{ + t.Run("raw message nack", func(t *testing.T) { + _, err := publisher.Publish(ctx, "raw-topic", []byte("test")) + assert.NoError(t, err) + + publisher.Stop() + + waiter := supervisor.StartNackWaiter("raw-subscription") + + //nolint:errcheck + go subscriber.Subscribe(ctx, "raw-subscription", func(ctx context.Context, m *message.Message) { + assert.Equal(t, []byte("test"), m.Data()) + + m.Nack() + }) + + _, err = waiter.WaitMaxDuration(ctx, time.Second) + assert.NoError(t, err) + }) + + t.Run("avro message ack", func(t *testing.T) { + _, err := publisher.Publish(ctx, "avro-topic", &avro.SimpleRecord{ StringField: "test avro", FloatField: 12.34, BooleanField: true, }) - assert.NotNil(t, res) - assert.NoError(t, err) - - sid, err := res.Get(ctx) - assert.NotEmpty(t, sid) assert.NoError(t, err) publisher.Stop() @@ -124,17 +133,42 @@ func TestFxGcpPubSubModule(t *testing.T) { assert.NoError(t, err) }) - t.Run("proto message", func(t *testing.T) { - res, err := publisher.Publish(ctx, "proto-topic", &proto.SimpleRecord{ + t.Run("avro message nack", func(t *testing.T) { + _, err := publisher.Publish(ctx, "avro-topic", &avro.SimpleRecord{ + StringField: "test avro", + FloatField: 12.34, + BooleanField: true, + }) + assert.NoError(t, err) + + publisher.Stop() + + waiter := supervisor.StartNackWaiter("avro-subscription") + + //nolint:errcheck + go subscriber.Subscribe(ctx, "avro-subscription", func(ctx context.Context, m *message.Message) { + var out avro.SimpleRecord + + err = m.Decode(&out) + assert.NoError(t, err) + + assert.Equal(t, "test avro", out.StringField) + assert.Equal(t, float32(12.34), out.FloatField) + assert.True(t, out.BooleanField) + + m.Nack() + }) + + _, err = waiter.WaitMaxDuration(ctx, time.Second) + assert.NoError(t, err) + }) + + t.Run("proto message ack", func(t *testing.T) { + _, err := publisher.Publish(ctx, "proto-topic", &proto.SimpleRecord{ StringField: "test proto", FloatField: 56.78, BooleanField: false, }) - assert.NotNil(t, res) - assert.NoError(t, err) - - sid, err := res.Get(ctx) - assert.NotEmpty(t, sid) assert.NoError(t, err) publisher.Stop() @@ -158,4 +192,34 @@ func TestFxGcpPubSubModule(t *testing.T) { _, err = waiter.WaitMaxDuration(ctx, time.Second) assert.NoError(t, err) }) + + t.Run("proto message nack", func(t *testing.T) { + _, err := publisher.Publish(ctx, "proto-topic", &proto.SimpleRecord{ + StringField: "test proto", + FloatField: 56.78, + BooleanField: false, + }) + assert.NoError(t, err) + + publisher.Stop() + + waiter := supervisor.StartNackWaiter("proto-subscription") + + //nolint:errcheck + go subscriber.Subscribe(ctx, "proto-subscription", func(ctx context.Context, m *message.Message) { + var out proto.SimpleRecord + + err = m.Decode(&out) + assert.NoError(t, err) + + assert.Equal(t, "test proto", out.StringField) + assert.Equal(t, float32(56.78), out.FloatField) + assert.False(t, out.BooleanField) + + m.Nack() + }) + + _, err = waiter.WaitMaxDuration(ctx, time.Second) + assert.NoError(t, err) + }) } diff --git a/fxgcppubsub/reactor/ack/reactor.go b/fxgcppubsub/reactor/ack/reactor.go index 788a08a..65f2702 100644 --- a/fxgcppubsub/reactor/ack/reactor.go +++ b/fxgcppubsub/reactor/ack/reactor.go @@ -20,6 +20,7 @@ func NewAckReactor(supervisor AckSupervisor) *AckReactor { func (r *AckReactor) FuncNames() []string { return []string{ "Acknowledge", + "ModifyAckDeadline", } } @@ -29,5 +30,9 @@ func (r *AckReactor) React(req any) (bool, any, error) { r.supervisor.StopAckWaiter(ackReq.Subscription, ackReq.AckIds, nil) } + if ackReq, ok := req.(*pubsubpb.ModifyAckDeadlineRequest); ok { + r.supervisor.StopNackWaiter(ackReq.Subscription, ackReq.AckIds, nil) + } + return false, nil, nil } diff --git a/fxgcppubsub/reactor/ack/reactor_test.go b/fxgcppubsub/reactor/ack/reactor_test.go index f1292c5..abc6424 100644 --- a/fxgcppubsub/reactor/ack/reactor_test.go +++ b/fxgcppubsub/reactor/ack/reactor_test.go @@ -36,10 +36,17 @@ func TestAckReactor(t *testing.T) { react := ack.NewAckReactor(sup) t.Run("func names", func(t *testing.T) { - assert.Equal(t, []string{"Acknowledge"}, react.FuncNames()) + assert.Equal( + t, + []string{ + "Acknowledge", + "ModifyAckDeadline", + }, + react.FuncNames(), + ) }) - t.Run("react", func(t *testing.T) { + t.Run("react to ack", func(t *testing.T) { req := &pubsubpb.AcknowledgeRequest{ Subscription: subscription.NormalizeSubscriptionName("test-project", "test-subscription"), AckIds: []string{"test-id"}, @@ -59,4 +66,25 @@ func TestAckReactor(t *testing.T) { assert.NoError(t, err) assert.Equal(t, []string{"test-id"}, data) }) + + t.Run("react to nack", func(t *testing.T) { + req := &pubsubpb.ModifyAckDeadlineRequest{ + Subscription: subscription.NormalizeSubscriptionName("test-project", "test-subscription"), + AckIds: []string{"test-id"}, + } + + waiter := sup.StartNackWaiter("test-subscription") + + go func() { + rHandled, rRet, rErr := react.React(req) + + assert.False(t, rHandled) + assert.Nil(t, rRet) + assert.NoError(t, rErr) + }() + + data, err := waiter.WaitMaxDuration(context.Background(), 1*time.Millisecond) + assert.NoError(t, err) + assert.Equal(t, []string{"test-id"}, data) + }) } diff --git a/fxgcppubsub/reactor/ack/supervisor.go b/fxgcppubsub/reactor/ack/supervisor.go index ac5386d..e99eaa4 100644 --- a/fxgcppubsub/reactor/ack/supervisor.go +++ b/fxgcppubsub/reactor/ack/supervisor.go @@ -1,23 +1,35 @@ package ack import ( + "fmt" + "github.com/ankorstore/yokai-contrib/fxgcppubsub/reactor" "github.com/ankorstore/yokai-contrib/fxgcppubsub/subscription" "github.com/ankorstore/yokai/config" ) +const ( + Ack = "ack" + Nack = "nack" +) + var _ AckSupervisor = (*DefaultAckSupervisor)(nil) +// AckSupervisor is a reactor supervisor that reacts to acks ans nacks. type AckSupervisor interface { StartAckWaiter(subscriptionID string) *reactor.Waiter StopAckWaiter(subscriptionName string, ackIDs []string, err error) + StartNackWaiter(subscriptionID string) *reactor.Waiter + StopNackWaiter(subscriptionName string, ackIDs []string, err error) } +// DefaultAckSupervisor is the default AckSupervisor implementation. type DefaultAckSupervisor struct { supervisor reactor.WaiterSupervisor config *config.Config } +// NewDefaultAckSupervisor returns a new DefaultAckSupervisor instance. func NewDefaultAckSupervisor(supervisor reactor.WaiterSupervisor, config *config.Config) *DefaultAckSupervisor { return &DefaultAckSupervisor{ supervisor: supervisor, @@ -25,15 +37,35 @@ func NewDefaultAckSupervisor(supervisor reactor.WaiterSupervisor, config *config } } +// StartAckWaiter starts an ack waiter on a provided subscriptionID. func (s *DefaultAckSupervisor) StartAckWaiter(subscriptionID string) *reactor.Waiter { + return s.startWaiter(subscriptionID, Ack) +} + +// StopAckWaiter stop an ack waiter for a provided subscriptionName. +func (s *DefaultAckSupervisor) StopAckWaiter(subscriptionName string, ackIDs []string, err error) { + s.stopWaiter(subscriptionName, Ack, ackIDs, err) +} + +// StartNackWaiter starts a nack waiter on a provided subscriptionID. +func (s *DefaultAckSupervisor) StartNackWaiter(subscriptionID string) *reactor.Waiter { + return s.startWaiter(subscriptionID, Nack) +} + +// StopNackWaiter stop a nack waiter for a provided subscriptionName. +func (s *DefaultAckSupervisor) StopNackWaiter(subscriptionName string, ackIDs []string, err error) { + s.stopWaiter(subscriptionName, Nack, ackIDs, err) +} + +func (s *DefaultAckSupervisor) startWaiter(subscriptionID string, kind string) *reactor.Waiter { subscriptionName := subscription.NormalizeSubscriptionName( s.config.GetString("modules.gcppubsub.project.id"), subscriptionID, ) - return s.supervisor.StartWaiter(subscriptionName) + return s.supervisor.StartWaiter(fmt.Sprintf("%s::%s", kind, subscriptionName)) } -func (s *DefaultAckSupervisor) StopAckWaiter(subscriptionName string, ackIDs []string, err error) { - s.supervisor.StopWaiter(subscriptionName, ackIDs, err) +func (s *DefaultAckSupervisor) stopWaiter(subscriptionName string, kind string, ackIDs []string, err error) { + s.supervisor.StopWaiter(fmt.Sprintf("%s::%s", kind, subscriptionName), ackIDs, err) } diff --git a/fxgcppubsub/reactor/ack/supervisor_test.go b/fxgcppubsub/reactor/ack/supervisor_test.go index 7c908e2..a4b894c 100644 --- a/fxgcppubsub/reactor/ack/supervisor_test.go +++ b/fxgcppubsub/reactor/ack/supervisor_test.go @@ -68,4 +68,41 @@ func TestAckSupervisor(t *testing.T) { assert.Equal(t, assert.AnError, err) assert.Equal(t, []string{"test-id"}, data) }) + + t.Run("wait for nack", func(t *testing.T) { + waiter := supervisor.StartNackWaiter("test-subscription") + + go func() { + time.Sleep(1 * time.Millisecond) + + supervisor.StopNackWaiter( + subscription.NormalizeSubscriptionName("test-project", "test-subscription"), + []string{"test-id"}, + nil, + ) + }() + + data, err := waiter.WaitMaxDuration(context.Background(), 5*time.Millisecond) + assert.NoError(t, err) + assert.Equal(t, []string{"test-id"}, data) + }) + + t.Run("wait for nack with error", func(t *testing.T) { + waiter := supervisor.StartNackWaiter("test-subscription") + + go func() { + time.Sleep(1 * time.Millisecond) + + supervisor.StopNackWaiter( + subscription.NormalizeSubscriptionName("test-project", "test-subscription"), + []string{"test-id"}, + assert.AnError, + ) + }() + + data, err := waiter.WaitMaxDuration(context.Background(), 5*time.Millisecond) + assert.Error(t, err) + assert.Equal(t, assert.AnError, err) + assert.Equal(t, []string{"test-id"}, data) + }) } diff --git a/fxgcppubsub/reactor/log/reactor.go b/fxgcppubsub/reactor/log/reactor.go index 7b05c7d..d33dced 100644 --- a/fxgcppubsub/reactor/log/reactor.go +++ b/fxgcppubsub/reactor/log/reactor.go @@ -1,6 +1,8 @@ package log import ( + "fmt" + "github.com/ankorstore/yokai/log" ) @@ -49,7 +51,7 @@ func (r *LogReactor) FuncNames() []string { // React is the reactor logic. func (r *LogReactor) React(req any) (bool, any, error) { - r.logger.Debug().Interface("req", req).Msg("log reactor") + r.logger.Debug().Str("type", fmt.Sprintf("%T", req)).Interface("data", req).Msg("log reactor") return false, nil, nil } diff --git a/fxgcppubsub/reactor/log/reactor_test.go b/fxgcppubsub/reactor/log/reactor_test.go index 8f8f0e9..8663b89 100644 --- a/fxgcppubsub/reactor/log/reactor_test.go +++ b/fxgcppubsub/reactor/log/reactor_test.go @@ -3,6 +3,7 @@ package log_test import ( "testing" + "cloud.google.com/go/pubsub/apiv1/pubsubpb" "github.com/ankorstore/yokai-contrib/fxgcppubsub/reactor/log" yokailog "github.com/ankorstore/yokai/log" "github.com/ankorstore/yokai/log/logtest" @@ -61,7 +62,12 @@ func TestLogReactor(t *testing.T) { t.Run("react", func(t *testing.T) { t.Parallel() - rHandled, rRet, rErr := react.React("test") + req := &pubsubpb.AcknowledgeRequest{ + Subscription: "test-subscription", + AckIds: []string{"test-id"}, + } + + rHandled, rRet, rErr := react.React(req) assert.False(t, rHandled) assert.Nil(t, rRet) @@ -69,7 +75,8 @@ func TestLogReactor(t *testing.T) { logtest.AssertHasLogRecord(t, logBuffer, map[string]interface{}{ "level": "debug", - "req": "test", + "type": "*pubsubpb.AcknowledgeRequest", + "data": "map[ack_ids:[test-id] subscription:test-subscription]", "message": "log reactor", }) })