diff --git a/extensions/notification/queueProcessor/QueueProcessor.js b/extensions/notification/queueProcessor/QueueProcessor.js index e1bbabff4..86f4011ab 100644 --- a/extensions/notification/queueProcessor/QueueProcessor.js +++ b/extensions/notification/queueProcessor/QueueProcessor.js @@ -27,6 +27,29 @@ function onQueueProcessorEventProcessed(destination, eventType) { }); } +const kafkaLagMetric = ZenkoMetrics.createGauge({ + name: 's3_replication_queue_lag', + help: 'Number of update entries waiting to be consumed from the Kafka topic', + labelNames: ['origin', 'containerName', 'partition', 'serviceName'], +}); + + +/** + * Contains methods to incrememt different metrics + * @typedef {Object} MetricsHandler + * @property {CounterInc} dataReplicationStatus - Increments the replication status metric for data operation + * @property {CounterInc} metadataReplicationStatus - Increments the replication status metric for metadata operation + * @property {CounterInc} dataReplicationBytes - Increments the replication bytes metric for data operation + * @property {CounterInc} metadataReplicationBytes - Increments the replication bytes metric for metadata operation + * @property {CounterInc} sourceDataBytes - Increments the source data bytes metric + * @property {CounterInc} reads - Increments the read metric + * @property {CounterInc} writes - Increments the write metric + * @property {HistogramObserve} timeElapsed - Observes the time elapsed metric + */ +const metricsHandler = { + lag: wrapGaugeSet(kafkaLagMetric, defaultLabels), +}; + class QueueProcessor extends EventEmitter { /** * Create a queue processor object to activate notification from a @@ -329,6 +352,19 @@ class QueueProcessor extends EventEmitter { */ async handleMetrics(res, log) { log.debug('metrics requested'); + + if (this.repConfig.queueProcessor.logConsumerMetricsIntervalS && this._consumer) { + // consumer stats lag is on a different update cycle so we need to + // update the metrics when requested + const lagStats = this._consumer.consumerStats.lag; + Object.keys(lagStats).forEach(partition => { + metricsHandler.lag({ + partition, + serviceName: this.serviceName, + }, lagStats[partition]); + }); + } + res.writeHead(200, { 'Content-Type': ZenkoMetrics.asPrometheusContentType(), }); diff --git a/extensions/replication/queueProcessor/QueueProcessor.js b/extensions/replication/queueProcessor/QueueProcessor.js index 025d2e8d6..cb2e59b48 100644 --- a/extensions/replication/queueProcessor/QueueProcessor.js +++ b/extensions/replication/queueProcessor/QueueProcessor.js @@ -1029,7 +1029,7 @@ class QueueProcessor extends EventEmitter { static async handleMetrics(res, log) { log.debug('metrics requested'); - if (this.repConfig.queueProcessor.logConsumerMetricsIntervalS) { + if (this.repConfig.queueProcessor.logConsumerMetricsIntervalS && this._consumer) { // consumer stats lag is on a different update cycle so we need to // update the metrics when requested const lagStats = this._consumer.consumerStats.lag; diff --git a/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js b/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js index ab05e05ea..14dd84066 100644 --- a/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js +++ b/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js @@ -561,7 +561,7 @@ class ReplicationStatusProcessor { log.debug('metrics requested'); const metrics = await ZenkoMetrics.asPrometheus(); - if (this.repConfig.queueProcessor.logConsumerMetricsIntervalS) { + if (this.repConfig.queueProcessor.logConsumerMetricsIntervalS && this._consumer) { // consumer stats lag is on a different update cycle so we need to // update the metrics when requested const lagStats = this._consumer.consumerStats.lag;