Skip to content

Commit

Permalink
refactor: change common metric with suffx-vec
Browse files Browse the repository at this point in the history
  • Loading branch information
yudhasubki committed Dec 16, 2023
1 parent 1a1e480 commit d77c7cd
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 24 deletions.
3 changes: 2 additions & 1 deletion cmd/blockqueue/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ func (h *Http) Run(ctx context.Context, args []string) error {
<-shutdown

cancel()
stream.Close()

engine.Stop()
stream.Close()
sqlite.Close()
etcd.Close()

Expand Down
4 changes: 2 additions & 2 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Job[V chan io.ResponseMessages] struct {
}

type jobMetric struct {
message prometheus.Counter
message *prometheus.CounterVec
}

func newJob[V chan io.ResponseMessages](serverCtx context.Context, topic core.Topic, db *db, kv *kv) (*Job[V], error) {
Expand Down Expand Up @@ -351,7 +351,7 @@ func (job *Job[V]) dispatchJob() error {
}

job.pool.Publish(eventpool.SendJson(messages))
go job.metric.message.Inc()
go job.metric.message.WithLabelValues(job.Name).Inc()
}

return nil
Expand Down
12 changes: 6 additions & 6 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ type listenerOption struct {
}

type listenerMetric struct {
totalConsumedMessage prometheus.Counter
totalEnqueue prometheus.Gauge
totalConsumedMessage *prometheus.CounterVec
totalEnqueue *prometheus.GaugeVec
}

func newListener[V chan blockio.ResponseMessages](serverCtx context.Context, jobId string, subscriber core.Subscriber, bucket *kv) (*Listener[V], error) {
Expand Down Expand Up @@ -274,7 +274,7 @@ func (listener *Listener[V]) enqueue(messages chan blockio.ResponseMessages) str
Id: id,
Value: messages,
})
go listener.metric.totalEnqueue.Inc()
go listener.metric.totalEnqueue.WithLabelValues(listener.JobId, listener.Id).Inc()

listener.PriorityQueue.Cond.Broadcast()

Expand All @@ -293,7 +293,7 @@ func (listener *Listener[V]) dequeue(id string) {
}
}

go listener.metric.totalEnqueue.Dec()
go listener.metric.totalEnqueue.WithLabelValues(listener.JobId, listener.Id).Dec()
listener.PriorityQueue.Cond.Broadcast()
}

Expand Down Expand Up @@ -388,8 +388,8 @@ func (listener *Listener[V]) notify(response chan eventListener) {
Kind: deliveryKindEventListener,
}

go listener.metric.totalEnqueue.Dec()
go listener.metric.totalConsumedMessage.Add(float64(len(messages)))
go listener.metric.totalEnqueue.WithLabelValues(listener.JobId, listener.Id).Dec()
go listener.metric.totalConsumedMessage.WithLabelValues(listener.JobId, listener.Id).Add(float64(len(messages)))
}

func (listener *Listener[V]) watcher() {
Expand Down
36 changes: 21 additions & 15 deletions pkg/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,30 @@ var (
Help: "The total number succesfully published to the topic watcher",
})

MessagePublishedTopic = func(topicName string) prometheus.Counter {
return prometheus.NewCounter(prometheus.CounterOpts{
Name: fmt.Sprintf("message_published_%s", topicName),
Help: fmt.Sprintf("The total number succesfully published to the topic %s", topicName),
})
MessagePublishedTopic = func(topicName string) *prometheus.CounterVec {
return prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "worker",
Subsystem: "dispatch_job",
Name: fmt.Sprintf("message_published_%s", topicName),
Help: fmt.Sprintf("The total number succesfully published to the topic %s", topicName),
}, []string{"topic"})
}

TotalFlightRequestQueueSubscriber = func(topicName, subscriberName string) prometheus.Gauge {
return prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("total_flight_request_queue_topic_%s_subscriber_%s", topicName, subscriberName),
Help: fmt.Sprintf("The current total flight request queue on topic %s subscriber %s", topicName, subscriberName),
})
TotalFlightRequestQueueSubscriber = func(topicName, subscriberName string) *prometheus.GaugeVec {
return prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "request",
Subsystem: "queue",
Name: fmt.Sprintf("total_flight_request_queue_topic_%s_subscriber_%s", topicName, subscriberName),
Help: fmt.Sprintf("The current total flight request queue on topic %s subscriber %s", topicName, subscriberName),
}, []string{"topic", "subscriber"})
}

TotalConsumedMessage = func(topicName, subscriberName string) prometheus.Counter {
return prometheus.NewCounter(prometheus.CounterOpts{
Name: fmt.Sprintf("total_consumed_message_topic_%s_subscriber_%s", topicName, subscriberName),
Help: fmt.Sprintf("The current total consumed message on topic %s subscriber %s", topicName, subscriberName),
})
TotalConsumedMessage = func(topicName, subscriberName string) *prometheus.CounterVec {
return prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "request",
Subsystem: "message",
Name: fmt.Sprintf("total_consumed_message_topic_%s_subscriber_%s", topicName, subscriberName),
Help: fmt.Sprintf("The current total consumed message on topic %s subscriber %s", topicName, subscriberName),
}, []string{"topic", "subscriber"})
}
)

0 comments on commit d77c7cd

Please sign in to comment.