diff --git a/extensions/replication/queueProcessor/QueueProcessor.js b/extensions/replication/queueProcessor/QueueProcessor.js index ec1eddfd7..025d2e8d6 100644 --- a/extensions/replication/queueProcessor/QueueProcessor.js +++ b/extensions/replication/queueProcessor/QueueProcessor.js @@ -30,7 +30,7 @@ const BucketQueueEntry = require('../../../lib/models/BucketQueueEntry'); const ActionQueueEntry = require('../../../lib/models/ActionQueueEntry'); const MetricsProducer = require('../../../lib/MetricsProducer'); const libConstants = require('../../../lib/constants'); -const { wrapCounterInc, wrapHistogramObserve } = require('../../../lib/util/metrics'); +const { wrapCounterInc, wrapHistogramObserve, wrapGaugeSet } = require('../../../lib/util/metrics'); const { http: HttpAgent, https: HttpsAgent } = require('httpagent'); const { @@ -79,6 +79,12 @@ const metadataReplicationBytesMetric = ZenkoMetrics.createCounter({ labelNames: ['origin', 'serviceName', 'location'], }); +const kafkaLagMetric = ZenkoMetrics.createGauge({ + name: 's3_replication_queue_lag', + help: 'Number of update entries waiting to be consumed from the Kafka topic', + labelNames: ['origin', 'containerName', 'partition', 'serviceName'], +}); + const sourceDataBytesMetric = ZenkoMetrics.createCounter({ name: 's3_replication_source_data_bytes_total', help: 'Total number of data bytes read from replication source', @@ -133,6 +139,7 @@ const metricsHandler = { dataReplicationBytes: wrapCounterInc(dataReplicationBytesMetric, defaultLabels), metadataReplicationBytes: wrapCounterInc(metadataReplicationBytesMetric, defaultLabels), sourceDataBytes: wrapCounterInc(sourceDataBytesMetric, defaultLabels), + lag: wrapGaugeSet(kafkaLagMetric, defaultLabels), reads: wrapCounterInc(readMetric, defaultLabels), writes: wrapCounterInc(writeMetric, defaultLabels), timeElapsed: wrapHistogramObserve(timeElapsedMetric, defaultLabels), diff --git a/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js b/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js index 4c366473c..ab05e05ea 100644 --- a/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js +++ b/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js @@ -25,6 +25,7 @@ const constants = require('../../../lib/constants'); const { wrapCounterInc, wrapHistogramObserve, + wrapGaugeSet, } = require('../../../lib/util/metrics'); /** @@ -60,6 +61,12 @@ const loadMetricHandlers = jsutil.once(repConfig => { labelNames: ['origin', 'replicationStatus'], }); + const kafkaLagMetric = ZenkoMetrics.createGauge({ + name: 's3_replication_status_queue_lag', + help: 'Number of update entries waiting to be consumed from the Kafka topic', + labelNames: ['origin', 'containerName', 'partition', 'serviceName'], + }); + const replicationStatusDurationSeconds = ZenkoMetrics.createHistogram({ name: 's3_replication_status_process_duration_seconds', help: 'Duration of replication status processing', @@ -138,6 +145,7 @@ const loadMetricHandlers = jsutil.once(repConfig => { }; return { status: wrapCounterInc(replicationStatusMetric, defaultLabels), + lag: wrapGaugeSet(kafkaLagMetric, defaultLabels), statusDuration: wrapHistogramObserve(replicationStatusDurationSeconds, defaultLabels), replicationLatency: wrapHistogramObserve(replicationLatency, @@ -552,6 +560,19 @@ class ReplicationStatusProcessor { async handleMetrics(res, log) { log.debug('metrics requested'); const metrics = await ZenkoMetrics.asPrometheus(); + + if (this.repConfig.queueProcessor.logConsumerMetricsIntervalS) { + // consumer stats lag is on a different update cycle so we need to + // update the metrics when requested + const lagStats = this._consumer.consumerStats.lag; + Object.keys(lagStats).forEach(partition => { + this.metricHandlers.lag({ + partition, + serviceName: this.serviceName, + }, lagStats[partition]); + }); + } + res.writeHead(200, { 'Content-Type': ZenkoMetrics.asPrometheusContentType(), });