diff --git a/blockqueue.go b/blockqueue.go index 834fd30..e610726 100644 --- a/blockqueue.go +++ b/blockqueue.go @@ -2,33 +2,50 @@ package blockqueue import ( "context" + "encoding/json" "errors" + "io" "log/slog" "github.com/google/uuid" "github.com/jmoiron/sqlx" "github.com/yudhasubki/blockqueue/pkg/cas" "github.com/yudhasubki/blockqueue/pkg/core" - "github.com/yudhasubki/blockqueue/pkg/io" + bqio "github.com/yudhasubki/blockqueue/pkg/io" + "github.com/yudhasubki/eventpool" ) var ( ErrJobNotFound = errors.New("job not found") ) -type BlockQueue[V chan io.ResponseMessages] struct { +type BlockQueue[V chan bqio.ResponseMessages] struct { mtx *cas.SpinLock serverCtx context.Context jobs map[string]*Job[V] kv *kv + pool *eventpool.Eventpool } -func New[V chan io.ResponseMessages](bucket *kv) *BlockQueue[V] { - return &BlockQueue[V]{ +func New[V chan bqio.ResponseMessages](bucket *kv) *BlockQueue[V] { + blockqueue := &BlockQueue[V]{ mtx: cas.New(), jobs: make(map[string]*Job[V]), kv: bucket, } + pool := eventpool.New() + pool.Submit(eventpool.EventpoolListener{ + Name: "store_job", + Subscriber: blockqueue.storeJob, + Opts: []eventpool.SubscriberConfigFunc{ + eventpool.BufferSize(20000), + eventpool.MaxWorker(50), + }, + }) + pool.Run() + blockqueue.pool = pool + + return blockqueue } func (q *BlockQueue[V]) Run(ctx context.Context) error { @@ -108,33 +125,26 @@ func (q *BlockQueue[V]) ackMessage(ctx context.Context, topic core.Topic, subscr return job.ackMessage(ctx, topic, subscriberName, messageId) } -func (q *BlockQueue[V]) publish(ctx context.Context, topic core.Topic, request io.Publish) error { - job, exist := q.getJob(topic) +func (q *BlockQueue[V]) publish(ctx context.Context, topic core.Topic, request bqio.Publish) error { + _, exist := q.getJob(topic) if !exist { return ErrJobNotFound } - err := tx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { - return createMessages(ctx, core.Message{ - Id: uuid.New(), - TopicId: topic.Id, - Message: request.Message, - Status: core.MessageStatusWaiting, - }) - }) - if err != nil { - return err - } - - job.trigger() + q.pool.Publish(eventpool.SendJson(core.Message{ + Id: uuid.New(), + TopicId: topic.Id, + Message: request.Message, + Status: core.MessageStatusWaiting, + })) return nil } -func (q *BlockQueue[V]) getSubscribersStatus(ctx context.Context, topic core.Topic) (io.SubscriberMessages, error) { +func (q *BlockQueue[V]) getSubscribersStatus(ctx context.Context, topic core.Topic) (bqio.SubscriberMessages, error) { job, exist := q.getJob(topic) if !exist { - return io.SubscriberMessages{}, ErrJobNotFound + return bqio.SubscriberMessages{}, ErrJobNotFound } return job.getListenersStatus(ctx, topic) @@ -170,10 +180,10 @@ func (q *BlockQueue[V]) deleteSubscriber(ctx context.Context, topic core.Topic, return job.deleteListener(ctx, topic, subcriber) } -func (q *BlockQueue[V]) readSubscriberMessage(ctx context.Context, topic core.Topic, subscriber string) (io.ResponseMessages, error) { +func (q *BlockQueue[V]) readSubscriberMessage(ctx context.Context, topic core.Topic, subscriber string) (bqio.ResponseMessages, error) { job, exist := q.jobs[topic.Name] if !exist { - return io.ResponseMessages{}, ErrJobNotFound + return bqio.ResponseMessages{}, ErrJobNotFound } return job.enqueue(ctx, topic, subscriber) @@ -187,3 +197,20 @@ func (q *BlockQueue[V]) getJob(topic core.Topic) (*Job[V], bool) { return job, true } + +func (q *BlockQueue[V]) storeJob(name string, message io.Reader) error { + var request core.Message + err := json.NewDecoder(message).Decode(&request) + if err != nil { + return err + } + + return tx(context.Background(), func(ctx context.Context, tx *sqlx.Tx) error { + + return createMessages(ctx, request) + }) +} + +func (q *BlockQueue[V]) Close() { + q.pool.Close() +} diff --git a/cmd/blockqueue/http.go b/cmd/blockqueue/http.go index a7198a9..a5e9805 100644 --- a/cmd/blockqueue/http.go +++ b/cmd/blockqueue/http.go @@ -7,6 +7,7 @@ import ( "log/slog" "os/signal" "syscall" + "time" "github.com/go-chi/chi/v5" "github.com/lesismal/nbio/nbhttp" @@ -83,10 +84,14 @@ func (h *Http) Run(ctx context.Context, args []string) error { <-shutdown cancel() + stream.Close() engine.Stop() sqlite.Close() etcd.Close() + // handling graceful shutdown + time.Sleep(cfg.Http.Shutdown) + return nil } diff --git a/cmd/blockqueue/main.go b/cmd/blockqueue/main.go index 2e4b377..5709d0d 100644 --- a/cmd/blockqueue/main.go +++ b/cmd/blockqueue/main.go @@ -86,6 +86,10 @@ func ReadConfigFile(filename string) (_ Config, err error) { return config, err } + if config.Http.Shutdown.Seconds() == 0 { + config.Http.Shutdown = 30 * time.Second + } + logOutput := os.Stdout if config.Logging.Stderr { logOutput = os.Stderr @@ -118,7 +122,8 @@ func ReadConfigFile(filename string) (_ Config, err error) { } type HttpConfig struct { - Port string `yaml:"port"` + Port string `yaml:"port"` + Shutdown time.Duration `yaml:"shutdown"` } func register(fs *flag.FlagSet) *string { diff --git a/config.yaml.example b/config.yaml.example index f5bf19b..3385f7c 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -1,5 +1,6 @@ http: port: 8080 + shutdown: "30s" logging: level: "debug" type: "json" diff --git a/job.go b/job.go index 06e1f7c..a866b3e 100644 --- a/job.go +++ b/job.go @@ -278,6 +278,7 @@ func (job *Job[V]) remove() { func (job *Job[V]) fetchWaitingJob() { for { + ticker := time.NewTicker(1 * time.Second) select { case <-job.ctx.Done(): slog.Info( @@ -288,12 +289,7 @@ func (job *Job[V]) fetchWaitingJob() { job.close() return - case <-job.message: - slog.Debug( - "push job to the consumer bucket", - logPrefixTopic, job.Name, - ) - + case <-ticker.C: err := job.dispatchJob() if err != nil { slog.Error( @@ -312,7 +308,7 @@ func (job *Job[V]) dispatchJob() error { TopicId: []uuid.UUID{job.Id}, Status: []core.MessageStatus{core.MessageStatusWaiting}, Offset: 1, - Limit: 10, + Limit: 100, }) if err != nil { slog.Error( diff --git a/listener.go b/listener.go index d863465..043c7e2 100644 --- a/listener.go +++ b/listener.go @@ -215,13 +215,9 @@ func (listener *Listener[V]) deleteRetryMessage(id string) error { return ErrListenerRetryMessageNotFound } -func (listener *Listener[V]) messages() (bucket.MessageCounter, error) { - var ( - unpublishMessage = 0 - unackMessage = 0 - ) - - err := listener.kv.readBucketTx(func(tx *nutsdb.Tx) error { +func (listener *Listener[V]) messages() (counter bucket.MessageCounter, err error) { + counter.Name = listener.Id + err = listener.kv.readBucketTx(func(tx *nutsdb.Tx) error { size, err := tx.LSize(listener.JobId, listener.messageBucket()) if err != nil { if errors.Is(err, nutsdb.ErrListNotFound) { @@ -230,14 +226,14 @@ func (listener *Listener[V]) messages() (bucket.MessageCounter, error) { return err } - unpublishMessage = size + counter.UnpublishMessage = size return nil }) if err != nil { return bucket.MessageCounter{}, err } - err = listener.kv.readBucketTx(func(tx *nutsdb.Tx) error { + return counter, listener.kv.readBucketTx(func(tx *nutsdb.Tx) error { size, err := tx.LSize(listener.JobId, listener.retryBucket()) if err != nil { if errors.Is(err, nutsdb.ErrListNotFound) { @@ -247,18 +243,9 @@ func (listener *Listener[V]) messages() (bucket.MessageCounter, error) { return err } - unackMessage = size + counter.UnackMessage = size return nil }) - if err != nil { - return bucket.MessageCounter{}, err - } - - return bucket.MessageCounter{ - Name: listener.Id, - UnpublishMessage: unpublishMessage, - UnackMessage: unackMessage, - }, nil } func (listener *Listener[V]) enqueue(messages chan blockio.ResponseMessages) string { @@ -464,7 +451,6 @@ func (listener *Listener[V]) jobCatcher(name string, message io.Reader) error { err = backoff.Retry(func() error { return listener.kv.updateBucketTx(func(tx *nutsdb.Tx) error { - messageBytes := make([][]byte, 0) for idx, message := range messages { var ( id = fmt.Sprintf("%d_%d", prefix, idx) @@ -484,12 +470,10 @@ func (listener *Listener[V]) jobCatcher(name string, message io.Reader) error { return err } - messageBytes = append(messageBytes, b) - } - - err = tx.RPush(listener.JobId, listener.messageBucket(), messageBytes...) - if err != nil { - return err + err = tx.RPush(listener.JobId, listener.messageBucket(), b) + if err != nil { + return err + } } return nil