Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
benzekrimaha committed Nov 7, 2024
1 parent 185bf75 commit 4d7b9c8
Showing 1 changed file with 17 additions and 28 deletions.
45 changes: 17 additions & 28 deletions extensions/mongoProcessor/MongoQueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,21 @@ class MongoQueueProcessor {
this._mongoClient = new MongoClient(this.mongoClientConfig);
this._bucketMemState = new BucketMemState(Config);


}


/**
* Start kafka consumer
*
* @return {undefined}
*/
start() {
this.logger.info('starting mongo queue processor');
async.series([
next => this._mongoClient.setup(err => {
if (err) {
this.logger.error('could not connect to MongoDB', {
method: 'MongoQueueProcessor.start',
error: err.message,
});
}
return next(err);
}),
], error => {
if (error) {
this._mongoClient.setup(err => {
if (err) {
this.logger.error('could not connect to MongoDB', {
method: 'MongoQueueProcessor.start',
error: err.message,
});
this.logger.fatal('error starting mongo queue processor');
process.exit(1);
}
Expand Down Expand Up @@ -139,20 +131,17 @@ class MongoQueueProcessor {
* @return {undefined}
*/
stop(done) {
async.parallel([
next => {
if (this._consumer) {
this.logger.debug('closing kafka consumer', {
method: 'MongoQueueProcessor.stop',
});
return this._consumer.close(next);
}
this.logger.debug('no kafka consumer to close', {
method: 'MongoQueueProcessor.stop',
});
return next();
},
], done);
if (this._consumer) {
this.logger.debug('closing kafka consumer', {
method: 'MongoQueueProcessor.stop',
});
this._consumer.close(done);
} else {
this.logger.debug('no kafka consumer to close', {
method: 'MongoQueueProcessor.stop',
});
done();
}
}

_getZenkoObjectMetadata(log, entry, bucketInfo, done) {
Expand Down

0 comments on commit 4d7b9c8

Please sign in to comment.