diff --git a/cmd/blockqueue/http.go b/cmd/blockqueue/http.go index 7703b1a..38b57f2 100644 --- a/cmd/blockqueue/http.go +++ b/cmd/blockqueue/http.go @@ -87,8 +87,9 @@ func (h *Http) Run(ctx context.Context, args []string) error { <-shutdown cancel() - stream.Close() + engine.Stop() + stream.Close() sqlite.Close() etcd.Close() diff --git a/job.go b/job.go index 11ea768..42b24fa 100644 --- a/job.go +++ b/job.go @@ -37,7 +37,7 @@ type Job[V chan io.ResponseMessages] struct { } type jobMetric struct { - message prometheus.Counter + message *prometheus.CounterVec } func newJob[V chan io.ResponseMessages](serverCtx context.Context, topic core.Topic, db *db, kv *kv) (*Job[V], error) { @@ -351,7 +351,7 @@ func (job *Job[V]) dispatchJob() error { } job.pool.Publish(eventpool.SendJson(messages)) - go job.metric.message.Inc() + go job.metric.message.WithLabelValues(job.Name).Inc() } return nil diff --git a/listener.go b/listener.go index 8729854..935b03a 100644 --- a/listener.go +++ b/listener.go @@ -67,8 +67,8 @@ type listenerOption struct { } type listenerMetric struct { - totalConsumedMessage prometheus.Counter - totalEnqueue prometheus.Gauge + totalConsumedMessage *prometheus.CounterVec + totalEnqueue *prometheus.GaugeVec } func newListener[V chan blockio.ResponseMessages](serverCtx context.Context, jobId string, subscriber core.Subscriber, bucket *kv) (*Listener[V], error) { @@ -274,7 +274,7 @@ func (listener *Listener[V]) enqueue(messages chan blockio.ResponseMessages) str Id: id, Value: messages, }) - go listener.metric.totalEnqueue.Inc() + go listener.metric.totalEnqueue.WithLabelValues(listener.JobId, listener.Id).Inc() listener.PriorityQueue.Cond.Broadcast() @@ -293,7 +293,7 @@ func (listener *Listener[V]) dequeue(id string) { } } - go listener.metric.totalEnqueue.Dec() + go listener.metric.totalEnqueue.WithLabelValues(listener.JobId, listener.Id).Dec() listener.PriorityQueue.Cond.Broadcast() } @@ -388,8 +388,8 @@ func (listener *Listener[V]) notify(response chan eventListener) { Kind: deliveryKindEventListener, } - go listener.metric.totalEnqueue.Dec() - go listener.metric.totalConsumedMessage.Add(float64(len(messages))) + go listener.metric.totalEnqueue.WithLabelValues(listener.JobId, listener.Id).Dec() + go listener.metric.totalConsumedMessage.WithLabelValues(listener.JobId, listener.Id).Add(float64(len(messages))) } func (listener *Listener[V]) watcher() { diff --git a/pkg/metric/metric.go b/pkg/metric/metric.go index 8424d1c..01df7d5 100644 --- a/pkg/metric/metric.go +++ b/pkg/metric/metric.go @@ -12,24 +12,30 @@ var ( Help: "The total number succesfully published to the topic watcher", }) - MessagePublishedTopic = func(topicName string) prometheus.Counter { - return prometheus.NewCounter(prometheus.CounterOpts{ - Name: fmt.Sprintf("message_published_%s", topicName), - Help: fmt.Sprintf("The total number succesfully published to the topic %s", topicName), - }) + MessagePublishedTopic = func(topicName string) *prometheus.CounterVec { + return prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "worker", + Subsystem: "dispatch_job", + Name: fmt.Sprintf("message_published_%s", topicName), + Help: fmt.Sprintf("The total number succesfully published to the topic %s", topicName), + }, []string{"topic"}) } - TotalFlightRequestQueueSubscriber = func(topicName, subscriberName string) prometheus.Gauge { - return prometheus.NewGauge(prometheus.GaugeOpts{ - Name: fmt.Sprintf("total_flight_request_queue_topic_%s_subscriber_%s", topicName, subscriberName), - Help: fmt.Sprintf("The current total flight request queue on topic %s subscriber %s", topicName, subscriberName), - }) + TotalFlightRequestQueueSubscriber = func(topicName, subscriberName string) *prometheus.GaugeVec { + return prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "request", + Subsystem: "queue", + Name: fmt.Sprintf("total_flight_request_queue_topic_%s_subscriber_%s", topicName, subscriberName), + Help: fmt.Sprintf("The current total flight request queue on topic %s subscriber %s", topicName, subscriberName), + }, []string{"topic", "subscriber"}) } - TotalConsumedMessage = func(topicName, subscriberName string) prometheus.Counter { - return prometheus.NewCounter(prometheus.CounterOpts{ - Name: fmt.Sprintf("total_consumed_message_topic_%s_subscriber_%s", topicName, subscriberName), - Help: fmt.Sprintf("The current total consumed message on topic %s subscriber %s", topicName, subscriberName), - }) + TotalConsumedMessage = func(topicName, subscriberName string) *prometheus.CounterVec { + return prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "request", + Subsystem: "message", + Name: fmt.Sprintf("total_consumed_message_topic_%s_subscriber_%s", topicName, subscriberName), + Help: fmt.Sprintf("The current total consumed message on topic %s subscriber %s", topicName, subscriberName), + }, []string{"topic", "subscriber"}) } )