Skip to content

Commit

Permalink
feat: add metric publish message to the topic
Browse files Browse the repository at this point in the history
  • Loading branch information
yudhasubki committed Dec 14, 2023
1 parent 4a2ad4e commit 7d17764
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
13 changes: 13 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/nutsdb/nutsdb"
"github.com/prometheus/client_golang/prometheus"
"github.com/yudhasubki/blockqueue/pkg/cas"
"github.com/yudhasubki/blockqueue/pkg/core"
"github.com/yudhasubki/blockqueue/pkg/io"
"github.com/yudhasubki/blockqueue/pkg/metric"
"github.com/yudhasubki/eventpool"
"gopkg.in/guregu/null.v4"
)
Expand All @@ -31,6 +33,11 @@ type Job[V chan io.ResponseMessages] struct {
mtx *cas.SpinLock
listeners map[uuid.UUID]*Listener[V]
message chan bool
metric *jobMetric
}

type jobMetric struct {
message prometheus.Counter
}

func newJob[V chan io.ResponseMessages](serverCtx context.Context, topic core.Topic, db *db, kv *kv) (*Job[V], error) {
Expand All @@ -45,7 +52,11 @@ func newJob[V chan io.ResponseMessages](serverCtx context.Context, topic core.To
message: make(chan bool, 20000),
mtx: cas.New(),
pool: eventpool.New(),
metric: &jobMetric{
message: metric.MessagePublishedTopic(topic.Name),
},
}
prometheus.Register(job.metric.message)

err := job.createBucket()
if err != nil {
Expand Down Expand Up @@ -277,6 +288,7 @@ func (job *Job[V]) remove() {
logPrefixErr, err,
)
}
prometheus.Unregister(job.metric.message)
}

func (job *Job[V]) fetchWaitingJob() {
Expand Down Expand Up @@ -339,6 +351,7 @@ func (job *Job[V]) dispatchJob() error {
}

job.pool.Publish(eventpool.SendJson(messages))
go job.metric.message.Inc()
}

return nil
Expand Down
11 changes: 9 additions & 2 deletions pkg/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ import (

var (
MessagePublished = prometheus.NewCounter(prometheus.CounterOpts{
Name: fmt.Sprintf("message_store_to_the_topic_watcher"),
Help: "The total number succesfully published to the topic",
Name: "message_store_to_the_topic_watcher",
Help: "The total number succesfully published to the topic watcher",
})

MessagePublishedTopic = func(topicName string) prometheus.Counter {
return prometheus.NewCounter(prometheus.CounterOpts{
Name: fmt.Sprintf("message_published_%s", topicName),
Help: fmt.Sprintf("The total number succesfully published to the topic %s", topicName),
})
}
)

0 comments on commit 7d17764

Please sign in to comment.