From 7d17764f642f10b78072fa984b5403195a7b024c Mon Sep 17 00:00:00 2001 From: Yudha Subki Date: Fri, 15 Dec 2023 05:42:16 +0700 Subject: [PATCH] feat: add metric publish message to the topic --- job.go | 13 +++++++++++++ pkg/metric/metric.go | 11 +++++++++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/job.go b/job.go index ae42124..11ea768 100644 --- a/job.go +++ b/job.go @@ -8,9 +8,11 @@ import ( "github.com/google/uuid" "github.com/jmoiron/sqlx" "github.com/nutsdb/nutsdb" + "github.com/prometheus/client_golang/prometheus" "github.com/yudhasubki/blockqueue/pkg/cas" "github.com/yudhasubki/blockqueue/pkg/core" "github.com/yudhasubki/blockqueue/pkg/io" + "github.com/yudhasubki/blockqueue/pkg/metric" "github.com/yudhasubki/eventpool" "gopkg.in/guregu/null.v4" ) @@ -31,6 +33,11 @@ type Job[V chan io.ResponseMessages] struct { mtx *cas.SpinLock listeners map[uuid.UUID]*Listener[V] message chan bool + metric *jobMetric +} + +type jobMetric struct { + message prometheus.Counter } func newJob[V chan io.ResponseMessages](serverCtx context.Context, topic core.Topic, db *db, kv *kv) (*Job[V], error) { @@ -45,7 +52,11 @@ func newJob[V chan io.ResponseMessages](serverCtx context.Context, topic core.To message: make(chan bool, 20000), mtx: cas.New(), pool: eventpool.New(), + metric: &jobMetric{ + message: metric.MessagePublishedTopic(topic.Name), + }, } + prometheus.Register(job.metric.message) err := job.createBucket() if err != nil { @@ -277,6 +288,7 @@ func (job *Job[V]) remove() { logPrefixErr, err, ) } + prometheus.Unregister(job.metric.message) } func (job *Job[V]) fetchWaitingJob() { @@ -339,6 +351,7 @@ func (job *Job[V]) dispatchJob() error { } job.pool.Publish(eventpool.SendJson(messages)) + go job.metric.message.Inc() } return nil diff --git a/pkg/metric/metric.go b/pkg/metric/metric.go index d0d8f4e..dc16e3c 100644 --- a/pkg/metric/metric.go +++ b/pkg/metric/metric.go @@ -8,7 +8,14 @@ import ( var ( MessagePublished = prometheus.NewCounter(prometheus.CounterOpts{ - Name: fmt.Sprintf("message_store_to_the_topic_watcher"), - Help: "The total number succesfully published to the topic", + Name: "message_store_to_the_topic_watcher", + Help: "The total number succesfully published to the topic watcher", }) + + MessagePublishedTopic = func(topicName string) prometheus.Counter { + return prometheus.NewCounter(prometheus.CounterOpts{ + Name: fmt.Sprintf("message_published_%s", topicName), + Help: fmt.Sprintf("The total number succesfully published to the topic %s", topicName), + }) + } )