Skip to content

Commit

Permalink
Refactor metrics handling in QueueProcessor and ReplicationStatusProc…
Browse files Browse the repository at this point in the history
…essor

Issue: bb-561
  • Loading branch information
KillianG committed Nov 6, 2024
1 parent ae963f9 commit 0d3062d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
9 changes: 8 additions & 1 deletion extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const BucketQueueEntry = require('../../../lib/models/BucketQueueEntry');
const ActionQueueEntry = require('../../../lib/models/ActionQueueEntry');
const MetricsProducer = require('../../../lib/MetricsProducer');
const libConstants = require('../../../lib/constants');
const { wrapCounterInc, wrapHistogramObserve } = require('../../../lib/util/metrics');
const { wrapCounterInc, wrapHistogramObserve, wrapGaugeSet } = require('../../../lib/util/metrics');
const { http: HttpAgent, https: HttpsAgent } = require('httpagent');

const {
Expand Down Expand Up @@ -79,6 +79,12 @@ const metadataReplicationBytesMetric = ZenkoMetrics.createCounter({
labelNames: ['origin', 'serviceName', 'location'],
});

const kafkaLagMetric = ZenkoMetrics.createGauge({
name: 's3_replication_queue_lag',
help: 'Number of update entries waiting to be consumed from the Kafka topic',
labelNames: ['origin', 'containerName', 'partition', 'serviceName'],
});

const sourceDataBytesMetric = ZenkoMetrics.createCounter({
name: 's3_replication_source_data_bytes_total',
help: 'Total number of data bytes read from replication source',
Expand Down Expand Up @@ -133,6 +139,7 @@ const metricsHandler = {
dataReplicationBytes: wrapCounterInc(dataReplicationBytesMetric, defaultLabels),
metadataReplicationBytes: wrapCounterInc(metadataReplicationBytesMetric, defaultLabels),
sourceDataBytes: wrapCounterInc(sourceDataBytesMetric, defaultLabels),
lag: wrapGaugeSet(kafkaLagMetric, defaultLabels),
reads: wrapCounterInc(readMetric, defaultLabels),
writes: wrapCounterInc(writeMetric, defaultLabels),
timeElapsed: wrapHistogramObserve(timeElapsedMetric, defaultLabels),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const constants = require('../../../lib/constants');
const {
wrapCounterInc,
wrapHistogramObserve,
wrapGaugeSet,
} = require('../../../lib/util/metrics');

/**
Expand Down Expand Up @@ -60,6 +61,12 @@ const loadMetricHandlers = jsutil.once(repConfig => {
labelNames: ['origin', 'replicationStatus'],
});

const kafkaLagMetric = ZenkoMetrics.createGauge({
name: 's3_replication_status_queue_lag',
help: 'Number of update entries waiting to be consumed from the Kafka topic',
labelNames: ['origin', 'containerName', 'partition', 'serviceName'],
});

const replicationStatusDurationSeconds = ZenkoMetrics.createHistogram({
name: 's3_replication_status_process_duration_seconds',
help: 'Duration of replication status processing',
Expand Down Expand Up @@ -138,6 +145,7 @@ const loadMetricHandlers = jsutil.once(repConfig => {
};
return {
status: wrapCounterInc(replicationStatusMetric, defaultLabels),
lag: wrapGaugeSet(kafkaLagMetric, defaultLabels),
statusDuration: wrapHistogramObserve(replicationStatusDurationSeconds,
defaultLabels),
replicationLatency: wrapHistogramObserve(replicationLatency,
Expand Down Expand Up @@ -552,6 +560,19 @@ class ReplicationStatusProcessor {
async handleMetrics(res, log) {
log.debug('metrics requested');
const metrics = await ZenkoMetrics.asPrometheus();

if (this.repConfig.queueProcessor.logConsumerMetricsIntervalS) {

Check warning on line 564 in extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js#L564

Added line #L564 was not covered by tests
// consumer stats lag is on a different update cycle so we need to
// update the metrics when requested
const lagStats = this._consumer.consumerStats.lag;
Object.keys(lagStats).forEach(partition => {
this.metricHandlers.lag({

Check warning on line 569 in extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js#L567-L569

Added lines #L567 - L569 were not covered by tests
partition,
serviceName: this.serviceName,
}, lagStats[partition]);
});
}

res.writeHead(200, {
'Content-Type': ZenkoMetrics.asPrometheusContentType(),
});
Expand Down

0 comments on commit 0d3062d

Please sign in to comment.