From fe920991b636b14b4ac1bcf2855aad317244e48b Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Mon, 4 Nov 2024 15:04:11 +0100 Subject: [PATCH 01/11] BB-624: Suppress AWS SDK v3 warning in stderr Missing from S3C-9338 because this usage of sdk doesn't use the env variable --- lib/clients/BackbeatClient.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/clients/BackbeatClient.js b/lib/clients/BackbeatClient.js index 2c0a18cf4..11e527588 100644 --- a/lib/clients/BackbeatClient.js +++ b/lib/clients/BackbeatClient.js @@ -1,6 +1,8 @@ const AWS = require('aws-sdk'); const Service = require('aws-sdk').Service; +require('aws-sdk/lib/maintenance_mode_message').suppress = true; + // for more info, see how S3 client is configured in aws-sdk // (clients/s3.js and lib/services/s3.js) From ec7d2f0462ddca5006fad6469a6ce9c2a48c2317 Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Mon, 4 Nov 2024 15:32:33 +0100 Subject: [PATCH 02/11] BB-624: Improve image img size from 2.34G to 1.55G Will be improved in S3C-9479 --- .dockerignore | 20 ++++++++++++++++++++ images/nodesvc-base/Dockerfile | 16 +++++++++++++--- 2 files changed, 33 insertions(+), 3 deletions(-) create mode 100644 .dockerignore diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 000000000..3ad0b11c9 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,20 @@ +node_modules +localData/* +localMetadata/* +.git +.github +.tox +coverage +.DS_Store + +.dockerignore +docs/ +images/ +res/ +_config.yml +.eslintrc +.gitignore +circle.yml +DESIGN.md +README.md +Using.md diff --git a/images/nodesvc-base/Dockerfile b/images/nodesvc-base/Dockerfile index f9f770d5a..f9ca9cc92 100644 --- a/images/nodesvc-base/Dockerfile +++ b/images/nodesvc-base/Dockerfile @@ -8,15 +8,25 @@ RUN yarn cache clean \ && yarn install --production --frozen-lockfile --ignore-optional --ignore-engines --network-concurrency 1 \ && yarn cache clean +# Somehow chown in another layer will replace some node_modules built symlink with copy of file +# To keep the same output as previously +RUN chown -R ${USER} /tmp/build/node_modules + ################################################################################ FROM ghcr.io/scality/federation/nodesvc-base:7.10.7.0 WORKDIR ${HOME_DIR}/backbeat +RUN chown -R ${USER} ${HOME_DIR}/backbeat -COPY . ${HOME_DIR}/backbeat -COPY --from=builder /tmp/build/node_modules ./node_modules/ +# Keep same output as chown command without group (use group 0) +COPY --chown=${USER}:0 . ${HOME_DIR}/backbeat +COPY --from=builder --chown=${USER}:0 /tmp/build/node_modules ./node_modules/ -RUN chown -R ${USER} ${HOME_DIR}/backbeat +# All copied files are already chowned scality(1000):root(0) +# This removes a 763MB layer. Image goes from 2.34GB to 1.55GB +# Besides S3C Federation will chown scality(1001):scality(1001) at poststart +# TODO S3C-9479: Images will be reworked for size optimization +# RUN chown -R ${USER} ${HOME_DIR}/backbeat USER ${USER} From 68aa63dff4ac583b83c6e3791a02219c2274327d Mon Sep 17 00:00:00 2001 From: Giacomo Guiulfo Date: Mon, 21 May 2018 14:12:48 -0700 Subject: [PATCH 03/11] bf: crash when unable to connect with kafka (cherry picked from commit 101af70c242cd7aec100c2f3592a914f4b672f2c) --- lib/BackbeatProducer.js | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/lib/BackbeatProducer.js b/lib/BackbeatProducer.js index 9d77d70f0..de5b06473 100644 --- a/lib/BackbeatProducer.js +++ b/lib/BackbeatProducer.js @@ -69,7 +69,14 @@ class BackbeatProducer extends EventEmitter { 'request.timeout.ms': ACK_TIMEOUT, }); this._ready = false; - this._producer.connect(); + this._producer.connect({ timeout: 30000 }, () => { + const opts = { topic: 'backbeat-sanitycheck', timeout: 10000 }; + this._producer.getMetadata(opts, err => { + if (err) { + this.emit('error', err); + } + }); + }); this._producer.on('ready', () => { this._ready = true; this.emit('ready'); From caf220eca6624f4173070e9ca3866cbca41b6acb Mon Sep 17 00:00:00 2001 From: Giacomo Guiulfo Date: Tue, 5 Jun 2018 18:08:02 -0700 Subject: [PATCH 04/11] bf: crash backbeat consumer when unable to connect with kafka (cherry picked from commit 22282ac2f0a38515ae0e32ebb1923206e56eb778) --- .../ReplicationStatusProcessor.js | 9 ++++++++- lib/BackbeatConsumer.js | 9 ++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js b/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js index bb3871f42..41d1d2a55 100644 --- a/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js +++ b/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js @@ -315,6 +315,7 @@ class ReplicationStatusProcessor { * @return {undefined} */ start(options, cb) { + let consumerReady = false; this._FailedCRRProducer = new FailedCRRProducer(this.kafkaConfig); this._replayTopicNames.forEach(t => { this._ReplayProducers[t] = new ReplayProducer(this.kafkaConfig, t); @@ -332,8 +333,14 @@ class ReplicationStatusProcessor { bootstrap: options && options.bootstrap, logConsumerMetricsIntervalS: this.repConfig.replicationStatusProcessor.logConsumerMetricsIntervalS, }); - this._consumer.on('error', () => { }); + this._consumer.on('error', () => { + if (!consumerReady) { + this.logger.fatal('error starting a backbeat consumer'); + process.exit(1); + } + }); this._consumer.on('ready', () => { + consumerReady = true; this.logger.info('replication status processor is ready to ' + 'consume replication status entries'); this._consumer.subscribe(); diff --git a/lib/BackbeatConsumer.js b/lib/BackbeatConsumer.js index 90227c340..c35aa74a9 100644 --- a/lib/BackbeatConsumer.js +++ b/lib/BackbeatConsumer.js @@ -186,7 +186,14 @@ class BackbeatConsumer extends EventEmitter { consumerParams['client.rack'] = this._site; } this._consumer = new kafka.KafkaConsumer(consumerParams); - this._consumer.connect(); + this._consumer.connect({ timeout: 10000 }, () => { + const opts = { topic: 'backbeat-sanitycheck', timeout: 10000 }; + this._consumer.getMetadata(opts, err => { + if (err) { + this.emit('error', err); + } + }); + }); return this._consumer.once('ready', () => { this._consumerReady = true; this._checkIfReady(); From 7fb4dc19dd21c466dfe3fc05873a4069d011cef0 Mon Sep 17 00:00:00 2001 From: Giacomo Guiulfo Date: Tue, 12 Jun 2018 18:27:33 -0700 Subject: [PATCH 05/11] bf: crash backbeat components when unable to connect to kafka (cherry picked from commit 3c2aa762027861caae6c061ce0c78dddf32a64a5) --- .../replication/failedCRR/FailedCRRConsumer.js | 12 +++++++++++- .../replication/queueProcessor/QueueProcessor.js | 10 +++++++++- lib/MetricsConsumer.js | 9 ++++++++- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/extensions/replication/failedCRR/FailedCRRConsumer.js b/extensions/replication/failedCRR/FailedCRRConsumer.js index f50d69b58..3841f694b 100644 --- a/extensions/replication/failedCRR/FailedCRRConsumer.js +++ b/extensions/replication/failedCRR/FailedCRRConsumer.js @@ -35,6 +35,7 @@ class FailedCRRConsumer { * @return {undefined} */ start(cb) { + let consumerReady = false; const consumer = new BackbeatConsumer({ kafka: { hosts: this._kafkaConfig.hosts, @@ -46,8 +47,17 @@ class FailedCRRConsumer { queueProcessor: this.processKafkaEntry.bind(this), fetchMaxBytes: CONSUMER_FETCH_MAX_BYTES, }); - consumer.on('error', () => {}); + consumer.on('error', err => { + if (!consumerReady) { + this.logger.fatal('could not setup a backbeat consumer', { + method: 'FailedCRRConsumer.start', + error: err, + }); + process.exit(1); + } + }); consumer.on('ready', () => { + consumerReady = true; consumer.subscribe(); this.logger.info('retry consumer is ready to consume entries'); }); diff --git a/extensions/replication/queueProcessor/QueueProcessor.js b/extensions/replication/queueProcessor/QueueProcessor.js index d133b993f..74c924df9 100644 --- a/extensions/replication/queueProcessor/QueueProcessor.js +++ b/extensions/replication/queueProcessor/QueueProcessor.js @@ -399,6 +399,7 @@ class QueueProcessor extends EventEmitter { */ start(options) { this._setupProducer(err => { + let consumerReady = false; if (err) { this.logger.info('error setting up kafka producer', { error: err.message }); @@ -421,8 +422,15 @@ class QueueProcessor extends EventEmitter { queueProcessor: this.processKafkaEntry.bind(this), logConsumerMetricsIntervalS: this.repConfig.queueProcessor.logConsumerMetricsIntervalS, }); - this._consumer.on('error', () => { }); + this._consumer.on('error', () => { + if (!consumerReady) { + this.logger.fatal('queue processor failed to start a ' + + 'backbeat consumer'); + process.exit(1); + } + }); this._consumer.on('ready', () => { + consumerReady = true; this._consumer.subscribe(); this.logger.info('queue processor is ready to consume ' + `replication entries from ${this.topic}`); diff --git a/lib/MetricsConsumer.js b/lib/MetricsConsumer.js index 48e5e28c0..ca629db0b 100644 --- a/lib/MetricsConsumer.js +++ b/lib/MetricsConsumer.js @@ -39,6 +39,7 @@ class MetricsConsumer { } start() { + let consumerReady = false; const consumer = new BackbeatConsumer({ kafka: { hosts: this.kafkaConfig.hosts, @@ -50,8 +51,14 @@ class MetricsConsumer { queueProcessor: this.processKafkaEntry.bind(this), fetchMaxBytes: CONSUMER_FETCH_MAX_BYTES, }); - consumer.on('error', () => {}); + consumer.on('error', () => { + if (!consumerReady) { + this.logger.fatal('error starting metrics consumer'); + process.exit(1); + } + }); consumer.on('ready', () => { + consumerReady = true; consumer.subscribe(); this.logger.info('metrics processor is ready to consume entries'); }); From 95ad18428985f7a7bdbdcbe5156c37f35d06624e Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Mon, 4 Nov 2024 16:33:46 +0100 Subject: [PATCH 06/11] BB-624: S3C specific handling of startup failure --- lib/BackbeatConsumer.js | 12 +++++------- lib/BackbeatProducer.js | 12 +++++------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/lib/BackbeatConsumer.js b/lib/BackbeatConsumer.js index c35aa74a9..f5925cb23 100644 --- a/lib/BackbeatConsumer.js +++ b/lib/BackbeatConsumer.js @@ -186,13 +186,11 @@ class BackbeatConsumer extends EventEmitter { consumerParams['client.rack'] = this._site; } this._consumer = new kafka.KafkaConsumer(consumerParams); - this._consumer.connect({ timeout: 10000 }, () => { - const opts = { topic: 'backbeat-sanitycheck', timeout: 10000 }; - this._consumer.getMetadata(opts, err => { - if (err) { - this.emit('error', err); - } - }); + // S3C use 1m as zookeeper / kafka can start after backbeat and slowly + this._consumer.connect({ timeout: 60000 }, (err) => { + if (err) { + this.emit('error', err); + } }); return this._consumer.once('ready', () => { this._consumerReady = true; diff --git a/lib/BackbeatProducer.js b/lib/BackbeatProducer.js index de5b06473..57793ab58 100644 --- a/lib/BackbeatProducer.js +++ b/lib/BackbeatProducer.js @@ -69,13 +69,11 @@ class BackbeatProducer extends EventEmitter { 'request.timeout.ms': ACK_TIMEOUT, }); this._ready = false; - this._producer.connect({ timeout: 30000 }, () => { - const opts = { topic: 'backbeat-sanitycheck', timeout: 10000 }; - this._producer.getMetadata(opts, err => { - if (err) { - this.emit('error', err); - } - }); + // S3C use 1m as zookeeper / kafka can start after backbeat and slowly + this._producer.connect({ timeout: 60000 }, (err) => { + if (err) { + this.emit('error', err); + } }); this._producer.on('ready', () => { this._ready = true; From da391d433cd869452ecf1e7863d36c8b4b084e7f Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Mon, 4 Nov 2024 16:58:17 +0100 Subject: [PATCH 07/11] BB-624: backport some notification startup fix Copied from eb1e608766fecaf52ac2710345656a8bb2ed8b67 --- .../notification/destination/KafkaProducer.js | 3 +- .../queueProcessor/QueueProcessor.js | 31 +++++++++++++++---- .../notification/queueProcessor/task.js | 6 +++- 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/extensions/notification/destination/KafkaProducer.js b/extensions/notification/destination/KafkaProducer.js index 83a57129e..3aa358d32 100644 --- a/extensions/notification/destination/KafkaProducer.js +++ b/extensions/notification/destination/KafkaProducer.js @@ -72,12 +72,13 @@ class KafkaProducer extends EventEmitter { 'request.timeout.ms': ACK_TIMEOUT, }); this._ready = false; - this._producer.connect({}, error => { + this._producer.connect({ timeout: 60000 }, error => { if (error) { this._log.info('error connecting to broker', { error, method: 'KafkaProducer.constructor', }); + this.emit('error', error); } }); this._producer.on('ready', () => { diff --git a/extensions/notification/queueProcessor/QueueProcessor.js b/extensions/notification/queueProcessor/QueueProcessor.js index ece00ce39..099e3ae4d 100644 --- a/extensions/notification/queueProcessor/QueueProcessor.js +++ b/extensions/notification/queueProcessor/QueueProcessor.js @@ -116,9 +116,10 @@ class QueueProcessor extends EventEmitter { * @param {boolean} [options.disableConsumer] - true to disable * startup of consumer (for testing: one has to call * processQueueEntry() explicitly) + * @param {function} done callback * @return {undefined} */ - start(options) { + start(options, done) { async.series([ next => this._setupZookeeper(next), next => this._setupNotificationConfigManager(next), @@ -126,7 +127,7 @@ class QueueProcessor extends EventEmitter { next => this._destination.init(() => { if (options && options.disableConsumer) { this.emit('ready'); - return undefined; + return next(); } const { groupId, concurrency, logConsumerMetricsIntervalS } = this.notifConfig.queueProcessor; @@ -142,22 +143,31 @@ class QueueProcessor extends EventEmitter { queueProcessor: this.processKafkaEntry.bind(this), logConsumerMetricsIntervalS, }); - this._consumer.on('error', () => { }); + this._consumer.on('error', err => { + this.logger.error('error starting notification consumer', + { method: 'QueueProcessor.start', error: err.message }); + // crash if got error at startup + if (!this.isReady()) { + return next(err); + } + return undefined; + }); this._consumer.on('ready', () => { this._consumer.subscribe(); this.logger.info('queue processor is ready to consume ' + 'notification entries'); this.emit('ready'); + return next(); }); - return next(); + return undefined; }), ], err => { if (err) { this.logger.info('error starting notification queue processor', { error: err.message }); - return undefined; + return done(err); } - return undefined; + return done(); }); } @@ -261,6 +271,15 @@ class QueueProcessor extends EventEmitter { return done(); } } + + /** + * Checks if queue processor is ready to consume + * + * @returns {boolean} is queue processor ready + */ + isReady() { + return this._consumer && this._consumer.isReady(); + } } module.exports = QueueProcessor; diff --git a/extensions/notification/queueProcessor/task.js b/extensions/notification/queueProcessor/task.js index 4441f4326..9e159ae8c 100644 --- a/extensions/notification/queueProcessor/task.js +++ b/extensions/notification/queueProcessor/task.js @@ -24,7 +24,11 @@ try { 'could not be found in destinations defined'); const queueProcessor = new QueueProcessor( zkConfig, kafkaConfig, notifConfig, destinationConfig, destination); - queueProcessor.start(); + queueProcessor.start(undefined, err => { + if (err) { + process.exit(1); + } + }); } catch (err) { log.error('error starting notification queue processor task', { method: 'notification.task.queueProcessor.start', From dbbceea4d16eae95841181817819304502d31f6c Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Mon, 4 Nov 2024 20:05:40 +0100 Subject: [PATCH 08/11] BB-624: Don't ignore setup error for CRR status Prevent hanging indefinitely if replication status' BackbeatConsumer succeeds to connect to kafka but then FailedCRRPRoducer or ReplayProducer fails --- extensions/replication/failedCRR/FailedCRRProducer.js | 2 +- extensions/replication/replay/ReplayProducer.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions/replication/failedCRR/FailedCRRProducer.js b/extensions/replication/failedCRR/FailedCRRProducer.js index a48ea0281..01fcae9c7 100644 --- a/extensions/replication/failedCRR/FailedCRRProducer.js +++ b/extensions/replication/failedCRR/FailedCRRProducer.js @@ -29,7 +29,7 @@ class FailedCRRProducer { kafka: { hosts: this._kafkaConfig.hosts }, topic: this._topic, }); - this._producer.once('error', () => {}); + this._producer.once('error', cb); this._producer.once('ready', () => { this._producer.removeAllListeners('error'); this._producer.on('error', err => diff --git a/extensions/replication/replay/ReplayProducer.js b/extensions/replication/replay/ReplayProducer.js index 716835c42..1d72d5549 100644 --- a/extensions/replication/replay/ReplayProducer.js +++ b/extensions/replication/replay/ReplayProducer.js @@ -29,7 +29,7 @@ class ReplayProducer { kafka: { hosts: this._kafkaConfig.hosts }, topic: this._topic, }); - this._producer.once('error', () => {}); + this._producer.once('error', cb); this._producer.once('ready', () => { this._producer.removeAllListeners('error'); this._producer.on('error', err => From f40a1b2b5ed6c73a18e8a71538e2570ae0a1bb1d Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Mon, 4 Nov 2024 19:51:23 +0100 Subject: [PATCH 09/11] BB-624 S3C-9436: Prevent startup crash on probing If probing too soon on startup some CRR components can crash. Check consumer exists for metric function to use it. Also the probe server is started after the queue (like in ZENKO) --- bin/queuePopulator.js | 2 +- .../queueProcessor/QueueProcessor.js | 16 +++++----- extensions/replication/queueProcessor/task.js | 30 +++++++++---------- .../replication/replayProcessor/task.js | 30 +++++++++---------- .../replicationStatusProcessor/task.js | 2 +- 5 files changed, 41 insertions(+), 39 deletions(-) diff --git a/bin/queuePopulator.js b/bin/queuePopulator.js index 52edcc601..0eba2630f 100644 --- a/bin/queuePopulator.js +++ b/bin/queuePopulator.js @@ -52,6 +52,7 @@ const queuePopulator = new QueuePopulator( zkConfig, kafkaConfig, qpConfig, httpsConfig, mConfig, rConfig, extConfigs); async.waterfall([ + done => queuePopulator.open(done), done => startProbeServer(qpConfig.probeServer, (err, probeServer) => { if (err) { log.error('error starting probe server', { @@ -73,7 +74,6 @@ async.waterfall([ } done(); }), - done => queuePopulator.open(done), done => { const taskState = { batchInProgress: false, diff --git a/extensions/replication/queueProcessor/QueueProcessor.js b/extensions/replication/queueProcessor/QueueProcessor.js index 74c924df9..0d27270a0 100644 --- a/extensions/replication/queueProcessor/QueueProcessor.js +++ b/extensions/replication/queueProcessor/QueueProcessor.js @@ -576,13 +576,15 @@ class QueueProcessor extends EventEmitter { // consumer stats lag is on a different update cycle so we need to // update the metrics when requested - const lagStats = this._consumer.consumerStats.lag; - Object.keys(lagStats).forEach(partition => { - metricsHandler.lag({ - partition, - serviceName: this.serviceName, - }, lagStats[partition]); - }); + if (this._consumer) { + const lagStats = this._consumer.consumerStats.lag; + Object.keys(lagStats).forEach(partition => { + metricsHandler.lag({ + partition, + serviceName: this.serviceName, + }, lagStats[partition]); + }); + } const metrics = await promClient.register.metrics(); res.writeHead(200, { diff --git a/extensions/replication/queueProcessor/task.js b/extensions/replication/queueProcessor/task.js index fb60db8db..2be3c250b 100644 --- a/extensions/replication/queueProcessor/task.js +++ b/extensions/replication/queueProcessor/task.js @@ -55,6 +55,21 @@ function getProbeConfig(queueProcessorConfig, site) { } async.waterfall([ + done => { + metricsProducer.setupProducer(err => { + if (err) { + log.error('error starting metrics producer for queue processor', { + error: err, + method: 'MetricsProducer::setupProducer', + }); + } + done(err); + }); + }, + done => { + queueProcessor.on('ready', done); + queueProcessor.start(); + }, done => startProbeServer( getProbeConfig(repConfig.queueProcessor, site), (err, probeServer) => { @@ -79,21 +94,6 @@ async.waterfall([ done(); } ), - done => { - metricsProducer.setupProducer(err => { - if (err) { - log.error('error starting metrics producer for queue processor', { - error: err, - method: 'MetricsProducer::setupProducer', - }); - } - done(err); - }); - }, - done => { - queueProcessor.on('ready', done); - queueProcessor.start(); - }, ], err => { if (err) { log.error('error during queue processor initialization', { diff --git a/extensions/replication/replayProcessor/task.js b/extensions/replication/replayProcessor/task.js index 22783c70a..028bd9de3 100644 --- a/extensions/replication/replayProcessor/task.js +++ b/extensions/replication/replayProcessor/task.js @@ -63,6 +63,21 @@ function getProbeConfig(replayProcessorConfig, site, topicName) { } async.waterfall([ + done => { + metricsProducer.setupProducer(err => { + if (err) { + log.error('error starting metrics producer for queue processor', { + error: err, + method: 'MetricsProducer::setupProducer', + }); + } + done(err); + }); + }, + done => { + queueProcessor.on('ready', done); + queueProcessor.start(); + }, done => startProbeServer( getProbeConfig(repConfig.replayProcessor, site, topic), (err, probeServer) => { @@ -87,21 +102,6 @@ async.waterfall([ done(); } ), - done => { - metricsProducer.setupProducer(err => { - if (err) { - log.error('error starting metrics producer for queue processor', { - error: err, - method: 'MetricsProducer::setupProducer', - }); - } - done(err); - }); - }, - done => { - queueProcessor.on('ready', done); - queueProcessor.start(); - }, ], err => { if (err) { log.error('error during queue processor initialization', { diff --git a/extensions/replication/replicationStatusProcessor/task.js b/extensions/replication/replicationStatusProcessor/task.js index 6ac8551fe..3fd287e0d 100644 --- a/extensions/replication/replicationStatusProcessor/task.js +++ b/extensions/replication/replicationStatusProcessor/task.js @@ -25,6 +25,7 @@ werelogs.configure({ }); async.waterfall([ + done => replicationStatusProcessor.start(undefined, done), done => startProbeServer( repConfig.replicationStatusProcessor.probeServer, (err, probeServer) => { @@ -49,7 +50,6 @@ async.waterfall([ done(); } ), - done => replicationStatusProcessor.start(undefined, done), ], err => { if (err) { log.error('error during queue processor initialization', { From 617723b7a723bc364f92a6348fb756496bf3f595 Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Tue, 5 Nov 2024 18:57:24 +0100 Subject: [PATCH 10/11] BB-624: Notification restart if destination down If destination is down on start, QueueProcessor would ignore and start connecting to consumer. Now if destination is down the process will stop and restart so it can try connecting again to destination. --- .../destination/KafkaNotificationDestination.js | 3 ++- extensions/notification/queueProcessor/QueueProcessor.js | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/extensions/notification/destination/KafkaNotificationDestination.js b/extensions/notification/destination/KafkaNotificationDestination.js index 7b129afa7..5fa765caf 100644 --- a/extensions/notification/destination/KafkaNotificationDestination.js +++ b/extensions/notification/destination/KafkaNotificationDestination.js @@ -48,8 +48,9 @@ class KafkaNotificationDestination extends NotificationDestination { this._log.info('error setting up kafka notif destination', { error: err.message }); done(err); + } else { + done(); } - done(); }); } diff --git a/extensions/notification/queueProcessor/QueueProcessor.js b/extensions/notification/queueProcessor/QueueProcessor.js index 099e3ae4d..d6de1f90b 100644 --- a/extensions/notification/queueProcessor/QueueProcessor.js +++ b/extensions/notification/queueProcessor/QueueProcessor.js @@ -124,7 +124,9 @@ class QueueProcessor extends EventEmitter { next => this._setupZookeeper(next), next => this._setupNotificationConfigManager(next), next => this._setupDestination(this.destinationConfig.type, next), - next => this._destination.init(() => { + // if connection to destination fails, process will stop & restart + next => this._destination.init(next), + next => { if (options && options.disableConsumer) { this.emit('ready'); return next(); @@ -160,7 +162,7 @@ class QueueProcessor extends EventEmitter { return next(); }); return undefined; - }), + }, ], err => { if (err) { this.logger.info('error starting notification queue processor', From af67f465a177e0d7f1b946744a5f2dd977b65a1f Mon Sep 17 00:00:00 2001 From: Mickael Bourgois Date: Wed, 6 Nov 2024 02:16:42 +0100 Subject: [PATCH 11/11] BB-624: Test CRR metrics at startup crash --- tests/unit/replication/QueueProcessor.spec.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/unit/replication/QueueProcessor.spec.js b/tests/unit/replication/QueueProcessor.spec.js index a69b3aa2f..60f5b0278 100644 --- a/tests/unit/replication/QueueProcessor.spec.js +++ b/tests/unit/replication/QueueProcessor.spec.js @@ -178,5 +178,10 @@ describe('Queue Processor', () => { sinon.assert.calledOnceWithExactly(res.writeHead, 200, { 'Content-Type': promClient.register.contentType }); sinon.assert.calledOnceWithExactly(res.end, fakeMetrics); }); + + it('should not crash without _consumer', async () => { + qp._consumer = undefined; + return assert.doesNotReject(async () => await qp.handleMetrics(res, log)); + }); }); });