Skip to content

Commit

Permalink
test: unit testadd listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
yudhasubki committed Dec 14, 2023
1 parent c47b799 commit 5287ccf
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 14 deletions.
11 changes: 9 additions & 2 deletions blockqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,20 @@ func (q *BlockQueue[V]) getSubscribersStatus(ctx context.Context, topic core.Top
return job.getListenersStatus(ctx, topic)
}

func (q *BlockQueue[V]) addSubscriber(ctx context.Context, topic core.Topic) error {
func (q *BlockQueue[V]) addSubscriber(ctx context.Context, topic core.Topic, subscribers core.Subscribers) error {
job, exist := q.getJob(topic)
if !exist {
return ErrJobNotFound
}

err := job.addListener(ctx, topic)
err := tx(ctx, func(ctx context.Context, tx *sqlx.Tx) error {
return createTxSubscribers(ctx, tx, subscribers)
})
if err != nil {
return err
}

err = job.addListener(ctx, topic)
if err != nil {
return err
}
Expand Down
50 changes: 50 additions & 0 deletions blockqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,57 @@ func TestBlockQueuePublishAndRead(t *testing.T) {
}, ErrJobNotFound)
})
})
}

func TestBlockQueueCreateSubscriber(t *testing.T) {
t.Run("success create subscriber", func(t *testing.T) {
runBlockQueueTest(t, func(bq *BlockQueue[chan bqio.ResponseMessages]) {
var (
serverCtx = context.Background()
request = bqio.Topic{
Name: getRandomChar(1),
Subscribers: bqio.Subscribers{
{
Name: getRandomChar(2),
},
},
}
topic = request.Topic()
subscribers = request.Subscriber(topic.Id)
)
bq.Run(serverCtx)

requestSubscriber := bqio.Subscribers{
{
Name: getRandomChar(3),
},
}

testAddJob(t, serverCtx, bq, topic, subscribers, nil)
testAddSubscriber(t, serverCtx, bq, topic, requestSubscriber.Subscriber(topic.Id), nil)
})
})

t.Run("failed create subscriber job not found", func(t *testing.T) {
runBlockQueueTest(t, func(bq *BlockQueue[chan bqio.ResponseMessages]) {
var (
serverCtx = context.Background()
request = bqio.Topic{
Name: getRandomChar(1),
}
topic = request.Topic()
)
bq.Run(serverCtx)

requestSubscriber := bqio.Subscribers{
{
Name: getRandomChar(3),
},
}

testAddSubscriber(t, serverCtx, bq, topic, requestSubscriber.Subscriber(topic.Id), ErrJobNotFound)
})
})
}

func getRandomChar(i int) string {
Expand Down
13 changes: 1 addition & 12 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/go-chi/chi/v5"
"github.com/jmoiron/sqlx"
"github.com/yudhasubki/blockqueue/pkg/core"
httpresponse "github.com/yudhasubki/blockqueue/pkg/http"
"github.com/yudhasubki/blockqueue/pkg/io"
Expand Down Expand Up @@ -200,18 +199,8 @@ func (h *Http) createSubscriber(w http.ResponseWriter, r *http.Request) {
}

subscribers := request.Subscriber(topic.Id)
err = tx(r.Context(), func(ctx context.Context, tx *sqlx.Tx) error {
return createTxSubscribers(ctx, tx, subscribers)
})
if err != nil {
httpresponse.Write(w, http.StatusInternalServerError, &httpresponse.Response{
Error: err.Error(),
Message: httpresponse.MessageFailure,
})
return
}

err = h.Stream.addSubscriber(r.Context(), topic)
err = h.Stream.addSubscriber(r.Context(), topic, subscribers)
if err != nil {
httpresponse.Write(w, http.StatusInternalServerError, &httpresponse.Response{
Error: err.Error(),
Expand Down
9 changes: 9 additions & 0 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ func testReadSubscriberMessage(t *testing.T, ctx context.Context, bq *BlockQueue
return io.ResponseMessage{}
}

func testAddSubscriber(t *testing.T, ctx context.Context, bq *BlockQueue[chan io.ResponseMessages], topic core.Topic, subscribers core.Subscribers, expectErr error) {
err := bq.addSubscriber(ctx, topic, subscribers)
if err != nil {
require.Equal(t, expectErr, err)
} else {
require.NoError(t, err)
}
}

func testDeleteSubscriber(t *testing.T, ctx context.Context, bq *BlockQueue[chan io.ResponseMessages], topic core.Topic, subscriberName string, expectErr error) {
err := bq.deleteSubscriber(ctx, topic, subscriberName)
if err != nil {
Expand Down

0 comments on commit 5287ccf

Please sign in to comment.