diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 000000000..3ad0b11c9 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,20 @@ +node_modules +localData/* +localMetadata/* +.git +.github +.tox +coverage +.DS_Store + +.dockerignore +docs/ +images/ +res/ +_config.yml +.eslintrc +.gitignore +circle.yml +DESIGN.md +README.md +Using.md diff --git a/bin/queuePopulator.js b/bin/queuePopulator.js index 52edcc601..0eba2630f 100644 --- a/bin/queuePopulator.js +++ b/bin/queuePopulator.js @@ -52,6 +52,7 @@ const queuePopulator = new QueuePopulator( zkConfig, kafkaConfig, qpConfig, httpsConfig, mConfig, rConfig, extConfigs); async.waterfall([ + done => queuePopulator.open(done), done => startProbeServer(qpConfig.probeServer, (err, probeServer) => { if (err) { log.error('error starting probe server', { @@ -73,7 +74,6 @@ async.waterfall([ } done(); }), - done => queuePopulator.open(done), done => { const taskState = { batchInProgress: false, diff --git a/extensions/notification/destination/KafkaNotificationDestination.js b/extensions/notification/destination/KafkaNotificationDestination.js index 7b129afa7..5fa765caf 100644 --- a/extensions/notification/destination/KafkaNotificationDestination.js +++ b/extensions/notification/destination/KafkaNotificationDestination.js @@ -48,8 +48,9 @@ class KafkaNotificationDestination extends NotificationDestination { this._log.info('error setting up kafka notif destination', { error: err.message }); done(err); + } else { + done(); } - done(); }); } diff --git a/extensions/notification/destination/KafkaProducer.js b/extensions/notification/destination/KafkaProducer.js index 83a57129e..3aa358d32 100644 --- a/extensions/notification/destination/KafkaProducer.js +++ b/extensions/notification/destination/KafkaProducer.js @@ -72,12 +72,13 @@ class KafkaProducer extends EventEmitter { 'request.timeout.ms': ACK_TIMEOUT, }); this._ready = false; - this._producer.connect({}, error => { + this._producer.connect({ timeout: 60000 }, error => { if (error) { this._log.info('error connecting to broker', { error, method: 'KafkaProducer.constructor', }); + this.emit('error', error); } }); this._producer.on('ready', () => { diff --git a/extensions/notification/queueProcessor/QueueProcessor.js b/extensions/notification/queueProcessor/QueueProcessor.js index ece00ce39..d6de1f90b 100644 --- a/extensions/notification/queueProcessor/QueueProcessor.js +++ b/extensions/notification/queueProcessor/QueueProcessor.js @@ -116,17 +116,20 @@ class QueueProcessor extends EventEmitter { * @param {boolean} [options.disableConsumer] - true to disable * startup of consumer (for testing: one has to call * processQueueEntry() explicitly) + * @param {function} done callback * @return {undefined} */ - start(options) { + start(options, done) { async.series([ next => this._setupZookeeper(next), next => this._setupNotificationConfigManager(next), next => this._setupDestination(this.destinationConfig.type, next), - next => this._destination.init(() => { + // if connection to destination fails, process will stop & restart + next => this._destination.init(next), + next => { if (options && options.disableConsumer) { this.emit('ready'); - return undefined; + return next(); } const { groupId, concurrency, logConsumerMetricsIntervalS } = this.notifConfig.queueProcessor; @@ -142,22 +145,31 @@ class QueueProcessor extends EventEmitter { queueProcessor: this.processKafkaEntry.bind(this), logConsumerMetricsIntervalS, }); - this._consumer.on('error', () => { }); + this._consumer.on('error', err => { + this.logger.error('error starting notification consumer', + { method: 'QueueProcessor.start', error: err.message }); + // crash if got error at startup + if (!this.isReady()) { + return next(err); + } + return undefined; + }); this._consumer.on('ready', () => { this._consumer.subscribe(); this.logger.info('queue processor is ready to consume ' + 'notification entries'); this.emit('ready'); + return next(); }); - return next(); - }), + return undefined; + }, ], err => { if (err) { this.logger.info('error starting notification queue processor', { error: err.message }); - return undefined; + return done(err); } - return undefined; + return done(); }); } @@ -261,6 +273,15 @@ class QueueProcessor extends EventEmitter { return done(); } } + + /** + * Checks if queue processor is ready to consume + * + * @returns {boolean} is queue processor ready + */ + isReady() { + return this._consumer && this._consumer.isReady(); + } } module.exports = QueueProcessor; diff --git a/extensions/notification/queueProcessor/task.js b/extensions/notification/queueProcessor/task.js index 4441f4326..9e159ae8c 100644 --- a/extensions/notification/queueProcessor/task.js +++ b/extensions/notification/queueProcessor/task.js @@ -24,7 +24,11 @@ try { 'could not be found in destinations defined'); const queueProcessor = new QueueProcessor( zkConfig, kafkaConfig, notifConfig, destinationConfig, destination); - queueProcessor.start(); + queueProcessor.start(undefined, err => { + if (err) { + process.exit(1); + } + }); } catch (err) { log.error('error starting notification queue processor task', { method: 'notification.task.queueProcessor.start', diff --git a/extensions/replication/failedCRR/FailedCRRConsumer.js b/extensions/replication/failedCRR/FailedCRRConsumer.js index f50d69b58..3841f694b 100644 --- a/extensions/replication/failedCRR/FailedCRRConsumer.js +++ b/extensions/replication/failedCRR/FailedCRRConsumer.js @@ -35,6 +35,7 @@ class FailedCRRConsumer { * @return {undefined} */ start(cb) { + let consumerReady = false; const consumer = new BackbeatConsumer({ kafka: { hosts: this._kafkaConfig.hosts, @@ -46,8 +47,17 @@ class FailedCRRConsumer { queueProcessor: this.processKafkaEntry.bind(this), fetchMaxBytes: CONSUMER_FETCH_MAX_BYTES, }); - consumer.on('error', () => {}); + consumer.on('error', err => { + if (!consumerReady) { + this.logger.fatal('could not setup a backbeat consumer', { + method: 'FailedCRRConsumer.start', + error: err, + }); + process.exit(1); + } + }); consumer.on('ready', () => { + consumerReady = true; consumer.subscribe(); this.logger.info('retry consumer is ready to consume entries'); }); diff --git a/extensions/replication/failedCRR/FailedCRRProducer.js b/extensions/replication/failedCRR/FailedCRRProducer.js index a48ea0281..01fcae9c7 100644 --- a/extensions/replication/failedCRR/FailedCRRProducer.js +++ b/extensions/replication/failedCRR/FailedCRRProducer.js @@ -29,7 +29,7 @@ class FailedCRRProducer { kafka: { hosts: this._kafkaConfig.hosts }, topic: this._topic, }); - this._producer.once('error', () => {}); + this._producer.once('error', cb); this._producer.once('ready', () => { this._producer.removeAllListeners('error'); this._producer.on('error', err => diff --git a/extensions/replication/queueProcessor/QueueProcessor.js b/extensions/replication/queueProcessor/QueueProcessor.js index d133b993f..0d27270a0 100644 --- a/extensions/replication/queueProcessor/QueueProcessor.js +++ b/extensions/replication/queueProcessor/QueueProcessor.js @@ -399,6 +399,7 @@ class QueueProcessor extends EventEmitter { */ start(options) { this._setupProducer(err => { + let consumerReady = false; if (err) { this.logger.info('error setting up kafka producer', { error: err.message }); @@ -421,8 +422,15 @@ class QueueProcessor extends EventEmitter { queueProcessor: this.processKafkaEntry.bind(this), logConsumerMetricsIntervalS: this.repConfig.queueProcessor.logConsumerMetricsIntervalS, }); - this._consumer.on('error', () => { }); + this._consumer.on('error', () => { + if (!consumerReady) { + this.logger.fatal('queue processor failed to start a ' + + 'backbeat consumer'); + process.exit(1); + } + }); this._consumer.on('ready', () => { + consumerReady = true; this._consumer.subscribe(); this.logger.info('queue processor is ready to consume ' + `replication entries from ${this.topic}`); @@ -568,13 +576,15 @@ class QueueProcessor extends EventEmitter { // consumer stats lag is on a different update cycle so we need to // update the metrics when requested - const lagStats = this._consumer.consumerStats.lag; - Object.keys(lagStats).forEach(partition => { - metricsHandler.lag({ - partition, - serviceName: this.serviceName, - }, lagStats[partition]); - }); + if (this._consumer) { + const lagStats = this._consumer.consumerStats.lag; + Object.keys(lagStats).forEach(partition => { + metricsHandler.lag({ + partition, + serviceName: this.serviceName, + }, lagStats[partition]); + }); + } const metrics = await promClient.register.metrics(); res.writeHead(200, { diff --git a/extensions/replication/queueProcessor/task.js b/extensions/replication/queueProcessor/task.js index fb60db8db..2be3c250b 100644 --- a/extensions/replication/queueProcessor/task.js +++ b/extensions/replication/queueProcessor/task.js @@ -55,6 +55,21 @@ function getProbeConfig(queueProcessorConfig, site) { } async.waterfall([ + done => { + metricsProducer.setupProducer(err => { + if (err) { + log.error('error starting metrics producer for queue processor', { + error: err, + method: 'MetricsProducer::setupProducer', + }); + } + done(err); + }); + }, + done => { + queueProcessor.on('ready', done); + queueProcessor.start(); + }, done => startProbeServer( getProbeConfig(repConfig.queueProcessor, site), (err, probeServer) => { @@ -79,21 +94,6 @@ async.waterfall([ done(); } ), - done => { - metricsProducer.setupProducer(err => { - if (err) { - log.error('error starting metrics producer for queue processor', { - error: err, - method: 'MetricsProducer::setupProducer', - }); - } - done(err); - }); - }, - done => { - queueProcessor.on('ready', done); - queueProcessor.start(); - }, ], err => { if (err) { log.error('error during queue processor initialization', { diff --git a/extensions/replication/replay/ReplayProducer.js b/extensions/replication/replay/ReplayProducer.js index 716835c42..1d72d5549 100644 --- a/extensions/replication/replay/ReplayProducer.js +++ b/extensions/replication/replay/ReplayProducer.js @@ -29,7 +29,7 @@ class ReplayProducer { kafka: { hosts: this._kafkaConfig.hosts }, topic: this._topic, }); - this._producer.once('error', () => {}); + this._producer.once('error', cb); this._producer.once('ready', () => { this._producer.removeAllListeners('error'); this._producer.on('error', err => diff --git a/extensions/replication/replayProcessor/task.js b/extensions/replication/replayProcessor/task.js index 22783c70a..028bd9de3 100644 --- a/extensions/replication/replayProcessor/task.js +++ b/extensions/replication/replayProcessor/task.js @@ -63,6 +63,21 @@ function getProbeConfig(replayProcessorConfig, site, topicName) { } async.waterfall([ + done => { + metricsProducer.setupProducer(err => { + if (err) { + log.error('error starting metrics producer for queue processor', { + error: err, + method: 'MetricsProducer::setupProducer', + }); + } + done(err); + }); + }, + done => { + queueProcessor.on('ready', done); + queueProcessor.start(); + }, done => startProbeServer( getProbeConfig(repConfig.replayProcessor, site, topic), (err, probeServer) => { @@ -87,21 +102,6 @@ async.waterfall([ done(); } ), - done => { - metricsProducer.setupProducer(err => { - if (err) { - log.error('error starting metrics producer for queue processor', { - error: err, - method: 'MetricsProducer::setupProducer', - }); - } - done(err); - }); - }, - done => { - queueProcessor.on('ready', done); - queueProcessor.start(); - }, ], err => { if (err) { log.error('error during queue processor initialization', { diff --git a/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js b/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js index bb3871f42..41d1d2a55 100644 --- a/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js +++ b/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js @@ -315,6 +315,7 @@ class ReplicationStatusProcessor { * @return {undefined} */ start(options, cb) { + let consumerReady = false; this._FailedCRRProducer = new FailedCRRProducer(this.kafkaConfig); this._replayTopicNames.forEach(t => { this._ReplayProducers[t] = new ReplayProducer(this.kafkaConfig, t); @@ -332,8 +333,14 @@ class ReplicationStatusProcessor { bootstrap: options && options.bootstrap, logConsumerMetricsIntervalS: this.repConfig.replicationStatusProcessor.logConsumerMetricsIntervalS, }); - this._consumer.on('error', () => { }); + this._consumer.on('error', () => { + if (!consumerReady) { + this.logger.fatal('error starting a backbeat consumer'); + process.exit(1); + } + }); this._consumer.on('ready', () => { + consumerReady = true; this.logger.info('replication status processor is ready to ' + 'consume replication status entries'); this._consumer.subscribe(); diff --git a/extensions/replication/replicationStatusProcessor/task.js b/extensions/replication/replicationStatusProcessor/task.js index 6ac8551fe..3fd287e0d 100644 --- a/extensions/replication/replicationStatusProcessor/task.js +++ b/extensions/replication/replicationStatusProcessor/task.js @@ -25,6 +25,7 @@ werelogs.configure({ }); async.waterfall([ + done => replicationStatusProcessor.start(undefined, done), done => startProbeServer( repConfig.replicationStatusProcessor.probeServer, (err, probeServer) => { @@ -49,7 +50,6 @@ async.waterfall([ done(); } ), - done => replicationStatusProcessor.start(undefined, done), ], err => { if (err) { log.error('error during queue processor initialization', { diff --git a/images/nodesvc-base/Dockerfile b/images/nodesvc-base/Dockerfile index f9f770d5a..f9ca9cc92 100644 --- a/images/nodesvc-base/Dockerfile +++ b/images/nodesvc-base/Dockerfile @@ -8,15 +8,25 @@ RUN yarn cache clean \ && yarn install --production --frozen-lockfile --ignore-optional --ignore-engines --network-concurrency 1 \ && yarn cache clean +# Somehow chown in another layer will replace some node_modules built symlink with copy of file +# To keep the same output as previously +RUN chown -R ${USER} /tmp/build/node_modules + ################################################################################ FROM ghcr.io/scality/federation/nodesvc-base:7.10.7.0 WORKDIR ${HOME_DIR}/backbeat +RUN chown -R ${USER} ${HOME_DIR}/backbeat -COPY . ${HOME_DIR}/backbeat -COPY --from=builder /tmp/build/node_modules ./node_modules/ +# Keep same output as chown command without group (use group 0) +COPY --chown=${USER}:0 . ${HOME_DIR}/backbeat +COPY --from=builder --chown=${USER}:0 /tmp/build/node_modules ./node_modules/ -RUN chown -R ${USER} ${HOME_DIR}/backbeat +# All copied files are already chowned scality(1000):root(0) +# This removes a 763MB layer. Image goes from 2.34GB to 1.55GB +# Besides S3C Federation will chown scality(1001):scality(1001) at poststart +# TODO S3C-9479: Images will be reworked for size optimization +# RUN chown -R ${USER} ${HOME_DIR}/backbeat USER ${USER} diff --git a/lib/BackbeatConsumer.js b/lib/BackbeatConsumer.js index 90227c340..f5925cb23 100644 --- a/lib/BackbeatConsumer.js +++ b/lib/BackbeatConsumer.js @@ -186,7 +186,12 @@ class BackbeatConsumer extends EventEmitter { consumerParams['client.rack'] = this._site; } this._consumer = new kafka.KafkaConsumer(consumerParams); - this._consumer.connect(); + // S3C use 1m as zookeeper / kafka can start after backbeat and slowly + this._consumer.connect({ timeout: 60000 }, (err) => { + if (err) { + this.emit('error', err); + } + }); return this._consumer.once('ready', () => { this._consumerReady = true; this._checkIfReady(); diff --git a/lib/BackbeatProducer.js b/lib/BackbeatProducer.js index 9d77d70f0..57793ab58 100644 --- a/lib/BackbeatProducer.js +++ b/lib/BackbeatProducer.js @@ -69,7 +69,12 @@ class BackbeatProducer extends EventEmitter { 'request.timeout.ms': ACK_TIMEOUT, }); this._ready = false; - this._producer.connect(); + // S3C use 1m as zookeeper / kafka can start after backbeat and slowly + this._producer.connect({ timeout: 60000 }, (err) => { + if (err) { + this.emit('error', err); + } + }); this._producer.on('ready', () => { this._ready = true; this.emit('ready'); diff --git a/lib/MetricsConsumer.js b/lib/MetricsConsumer.js index 48e5e28c0..ca629db0b 100644 --- a/lib/MetricsConsumer.js +++ b/lib/MetricsConsumer.js @@ -39,6 +39,7 @@ class MetricsConsumer { } start() { + let consumerReady = false; const consumer = new BackbeatConsumer({ kafka: { hosts: this.kafkaConfig.hosts, @@ -50,8 +51,14 @@ class MetricsConsumer { queueProcessor: this.processKafkaEntry.bind(this), fetchMaxBytes: CONSUMER_FETCH_MAX_BYTES, }); - consumer.on('error', () => {}); + consumer.on('error', () => { + if (!consumerReady) { + this.logger.fatal('error starting metrics consumer'); + process.exit(1); + } + }); consumer.on('ready', () => { + consumerReady = true; consumer.subscribe(); this.logger.info('metrics processor is ready to consume entries'); }); diff --git a/lib/clients/BackbeatClient.js b/lib/clients/BackbeatClient.js index 2c0a18cf4..11e527588 100644 --- a/lib/clients/BackbeatClient.js +++ b/lib/clients/BackbeatClient.js @@ -1,6 +1,8 @@ const AWS = require('aws-sdk'); const Service = require('aws-sdk').Service; +require('aws-sdk/lib/maintenance_mode_message').suppress = true; + // for more info, see how S3 client is configured in aws-sdk // (clients/s3.js and lib/services/s3.js) diff --git a/package.json b/package.json index 07f6f0a07..78c9ca23f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "backbeat", - "version": "7.10.17", + "version": "7.10.18", "description": "Asynchronous queue and job manager", "main": "index.js", "scripts": { diff --git a/tests/unit/replication/QueueProcessor.spec.js b/tests/unit/replication/QueueProcessor.spec.js index a69b3aa2f..60f5b0278 100644 --- a/tests/unit/replication/QueueProcessor.spec.js +++ b/tests/unit/replication/QueueProcessor.spec.js @@ -178,5 +178,10 @@ describe('Queue Processor', () => { sinon.assert.calledOnceWithExactly(res.writeHead, 200, { 'Content-Type': promClient.register.contentType }); sinon.assert.calledOnceWithExactly(res.end, fakeMetrics); }); + + it('should not crash without _consumer', async () => { + qp._consumer = undefined; + return assert.doesNotReject(async () => await qp.handleMetrics(res, log)); + }); }); });