From 4d7b9c8b21559197860e6d0ec5d79146978e977d Mon Sep 17 00:00:00 2001 From: Maha Benzekri Date: Thu, 7 Nov 2024 10:26:26 +0100 Subject: [PATCH] wip --- .../mongoProcessor/MongoQueueProcessor.js | 45 +++++++------------ 1 file changed, 17 insertions(+), 28 deletions(-) diff --git a/extensions/mongoProcessor/MongoQueueProcessor.js b/extensions/mongoProcessor/MongoQueueProcessor.js index 65665c837..e76194818 100644 --- a/extensions/mongoProcessor/MongoQueueProcessor.js +++ b/extensions/mongoProcessor/MongoQueueProcessor.js @@ -71,10 +71,8 @@ class MongoQueueProcessor { this._mongoClient = new MongoClient(this.mongoClientConfig); this._bucketMemState = new BucketMemState(Config); - } - /** * Start kafka consumer * @@ -82,18 +80,12 @@ class MongoQueueProcessor { */ start() { this.logger.info('starting mongo queue processor'); - async.series([ - next => this._mongoClient.setup(err => { - if (err) { - this.logger.error('could not connect to MongoDB', { - method: 'MongoQueueProcessor.start', - error: err.message, - }); - } - return next(err); - }), - ], error => { - if (error) { + this._mongoClient.setup(err => { + if (err) { + this.logger.error('could not connect to MongoDB', { + method: 'MongoQueueProcessor.start', + error: err.message, + }); this.logger.fatal('error starting mongo queue processor'); process.exit(1); } @@ -139,20 +131,17 @@ class MongoQueueProcessor { * @return {undefined} */ stop(done) { - async.parallel([ - next => { - if (this._consumer) { - this.logger.debug('closing kafka consumer', { - method: 'MongoQueueProcessor.stop', - }); - return this._consumer.close(next); - } - this.logger.debug('no kafka consumer to close', { - method: 'MongoQueueProcessor.stop', - }); - return next(); - }, - ], done); + if (this._consumer) { + this.logger.debug('closing kafka consumer', { + method: 'MongoQueueProcessor.stop', + }); + this._consumer.close(done); + } else { + this.logger.debug('no kafka consumer to close', { + method: 'MongoQueueProcessor.stop', + }); + done(); + } } _getZenkoObjectMetadata(log, entry, bucketInfo, done) {