Skip to content

Commit

Permalink
perf: handling store message job with workerpool
Browse files Browse the repository at this point in the history
  • Loading branch information
yudhasubki committed Dec 14, 2023
1 parent 4202704 commit e9106b3
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 57 deletions.
73 changes: 50 additions & 23 deletions blockqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,50 @@ package blockqueue

import (
"context"
"encoding/json"
"errors"
"io"
"log/slog"

"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/yudhasubki/blockqueue/pkg/cas"
"github.com/yudhasubki/blockqueue/pkg/core"
"github.com/yudhasubki/blockqueue/pkg/io"
bqio "github.com/yudhasubki/blockqueue/pkg/io"
"github.com/yudhasubki/eventpool"
)

var (
ErrJobNotFound = errors.New("job not found")
)

type BlockQueue[V chan io.ResponseMessages] struct {
type BlockQueue[V chan bqio.ResponseMessages] struct {
mtx *cas.SpinLock
serverCtx context.Context
jobs map[string]*Job[V]
kv *kv
pool *eventpool.Eventpool
}

func New[V chan io.ResponseMessages](bucket *kv) *BlockQueue[V] {
return &BlockQueue[V]{
func New[V chan bqio.ResponseMessages](bucket *kv) *BlockQueue[V] {
blockqueue := &BlockQueue[V]{
mtx: cas.New(),
jobs: make(map[string]*Job[V]),
kv: bucket,
}
pool := eventpool.New()
pool.Submit(eventpool.EventpoolListener{
Name: "store_job",
Subscriber: blockqueue.storeJob,
Opts: []eventpool.SubscriberConfigFunc{
eventpool.BufferSize(20000),
eventpool.MaxWorker(50),
},
})
pool.Run()
blockqueue.pool = pool

return blockqueue
}

func (q *BlockQueue[V]) Run(ctx context.Context) error {
Expand Down Expand Up @@ -108,33 +125,26 @@ func (q *BlockQueue[V]) ackMessage(ctx context.Context, topic core.Topic, subscr
return job.ackMessage(ctx, topic, subscriberName, messageId)
}

func (q *BlockQueue[V]) publish(ctx context.Context, topic core.Topic, request io.Publish) error {
job, exist := q.getJob(topic)
func (q *BlockQueue[V]) publish(ctx context.Context, topic core.Topic, request bqio.Publish) error {
_, exist := q.getJob(topic)
if !exist {
return ErrJobNotFound
}

err := tx(ctx, func(ctx context.Context, tx *sqlx.Tx) error {
return createMessages(ctx, core.Message{
Id: uuid.New(),
TopicId: topic.Id,
Message: request.Message,
Status: core.MessageStatusWaiting,
})
})
if err != nil {
return err
}

job.trigger()
q.pool.Publish(eventpool.SendJson(core.Message{
Id: uuid.New(),
TopicId: topic.Id,
Message: request.Message,
Status: core.MessageStatusWaiting,
}))

return nil
}

func (q *BlockQueue[V]) getSubscribersStatus(ctx context.Context, topic core.Topic) (io.SubscriberMessages, error) {
func (q *BlockQueue[V]) getSubscribersStatus(ctx context.Context, topic core.Topic) (bqio.SubscriberMessages, error) {
job, exist := q.getJob(topic)
if !exist {
return io.SubscriberMessages{}, ErrJobNotFound
return bqio.SubscriberMessages{}, ErrJobNotFound
}

return job.getListenersStatus(ctx, topic)
Expand Down Expand Up @@ -170,10 +180,10 @@ func (q *BlockQueue[V]) deleteSubscriber(ctx context.Context, topic core.Topic,
return job.deleteListener(ctx, topic, subcriber)
}

func (q *BlockQueue[V]) readSubscriberMessage(ctx context.Context, topic core.Topic, subscriber string) (io.ResponseMessages, error) {
func (q *BlockQueue[V]) readSubscriberMessage(ctx context.Context, topic core.Topic, subscriber string) (bqio.ResponseMessages, error) {
job, exist := q.jobs[topic.Name]
if !exist {
return io.ResponseMessages{}, ErrJobNotFound
return bqio.ResponseMessages{}, ErrJobNotFound
}

return job.enqueue(ctx, topic, subscriber)
Expand All @@ -187,3 +197,20 @@ func (q *BlockQueue[V]) getJob(topic core.Topic) (*Job[V], bool) {

return job, true
}

func (q *BlockQueue[V]) storeJob(name string, message io.Reader) error {
var request core.Message
err := json.NewDecoder(message).Decode(&request)
if err != nil {
return err
}

return tx(context.Background(), func(ctx context.Context, tx *sqlx.Tx) error {

return createMessages(ctx, request)
})
}

func (q *BlockQueue[V]) Close() {
q.pool.Close()
}
5 changes: 5 additions & 0 deletions cmd/blockqueue/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"
"os/signal"
"syscall"
"time"

"github.com/go-chi/chi/v5"
"github.com/lesismal/nbio/nbhttp"
Expand Down Expand Up @@ -83,10 +84,14 @@ func (h *Http) Run(ctx context.Context, args []string) error {
<-shutdown

cancel()
stream.Close()
engine.Stop()
sqlite.Close()
etcd.Close()

// handling graceful shutdown
time.Sleep(cfg.Http.Shutdown)

return nil
}

Expand Down
7 changes: 6 additions & 1 deletion cmd/blockqueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func ReadConfigFile(filename string) (_ Config, err error) {
return config, err
}

if config.Http.Shutdown.Seconds() == 0 {
config.Http.Shutdown = 30 * time.Second
}

logOutput := os.Stdout
if config.Logging.Stderr {
logOutput = os.Stderr
Expand Down Expand Up @@ -118,7 +122,8 @@ func ReadConfigFile(filename string) (_ Config, err error) {
}

type HttpConfig struct {
Port string `yaml:"port"`
Port string `yaml:"port"`
Shutdown time.Duration `yaml:"shutdown"`
}

func register(fs *flag.FlagSet) *string {
Expand Down
1 change: 1 addition & 0 deletions config.yaml.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
http:
port: 8080
shutdown: "30s"
logging:
level: "debug"
type: "json"
Expand Down
10 changes: 3 additions & 7 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func (job *Job[V]) remove() {

func (job *Job[V]) fetchWaitingJob() {
for {
ticker := time.NewTicker(1 * time.Second)
select {
case <-job.ctx.Done():
slog.Info(
Expand All @@ -288,12 +289,7 @@ func (job *Job[V]) fetchWaitingJob() {
job.close()

return
case <-job.message:
slog.Debug(
"push job to the consumer bucket",
logPrefixTopic, job.Name,
)

case <-ticker.C:
err := job.dispatchJob()
if err != nil {
slog.Error(
Expand All @@ -312,7 +308,7 @@ func (job *Job[V]) dispatchJob() error {
TopicId: []uuid.UUID{job.Id},
Status: []core.MessageStatus{core.MessageStatusWaiting},
Offset: 1,
Limit: 10,
Limit: 100,
})
if err != nil {
slog.Error(
Expand Down
36 changes: 10 additions & 26 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,9 @@ func (listener *Listener[V]) deleteRetryMessage(id string) error {
return ErrListenerRetryMessageNotFound
}

func (listener *Listener[V]) messages() (bucket.MessageCounter, error) {
var (
unpublishMessage = 0
unackMessage = 0
)

err := listener.kv.readBucketTx(func(tx *nutsdb.Tx) error {
func (listener *Listener[V]) messages() (counter bucket.MessageCounter, err error) {
counter.Name = listener.Id
err = listener.kv.readBucketTx(func(tx *nutsdb.Tx) error {
size, err := tx.LSize(listener.JobId, listener.messageBucket())
if err != nil {
if errors.Is(err, nutsdb.ErrListNotFound) {
Expand All @@ -230,14 +226,14 @@ func (listener *Listener[V]) messages() (bucket.MessageCounter, error) {
return err
}

unpublishMessage = size
counter.UnpublishMessage = size
return nil
})
if err != nil {
return bucket.MessageCounter{}, err
}

err = listener.kv.readBucketTx(func(tx *nutsdb.Tx) error {
return counter, listener.kv.readBucketTx(func(tx *nutsdb.Tx) error {
size, err := tx.LSize(listener.JobId, listener.retryBucket())
if err != nil {
if errors.Is(err, nutsdb.ErrListNotFound) {
Expand All @@ -247,18 +243,9 @@ func (listener *Listener[V]) messages() (bucket.MessageCounter, error) {
return err
}

unackMessage = size
counter.UnackMessage = size
return nil
})
if err != nil {
return bucket.MessageCounter{}, err
}

return bucket.MessageCounter{
Name: listener.Id,
UnpublishMessage: unpublishMessage,
UnackMessage: unackMessage,
}, nil
}

func (listener *Listener[V]) enqueue(messages chan blockio.ResponseMessages) string {
Expand Down Expand Up @@ -464,7 +451,6 @@ func (listener *Listener[V]) jobCatcher(name string, message io.Reader) error {

err = backoff.Retry(func() error {
return listener.kv.updateBucketTx(func(tx *nutsdb.Tx) error {
messageBytes := make([][]byte, 0)
for idx, message := range messages {
var (
id = fmt.Sprintf("%d_%d", prefix, idx)
Expand All @@ -484,12 +470,10 @@ func (listener *Listener[V]) jobCatcher(name string, message io.Reader) error {
return err
}

messageBytes = append(messageBytes, b)
}

err = tx.RPush(listener.JobId, listener.messageBucket(), messageBytes...)
if err != nil {
return err
err = tx.RPush(listener.JobId, listener.messageBucket(), b)
if err != nil {
return err
}
}

return nil
Expand Down

0 comments on commit e9106b3

Please sign in to comment.