diff --git a/lib/BackbeatConsumer.js b/lib/BackbeatConsumer.js index be445b33de..baad57761d 100644 --- a/lib/BackbeatConsumer.js +++ b/lib/BackbeatConsumer.js @@ -179,7 +179,25 @@ class BackbeatConsumer extends EventEmitter { 'offset_commit_cb': this._onOffsetCommit.bind(this), // automatically create topic 'allow.auto.create.topics': true, - 'rebalance_cb': true, + 'rebalance_cb': (err, assignment) => { + if (err.code === kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) { + this._log.debug('rdkafka.assign', { err, assignment }); + this._consumer.assign(assignment); + } else if (err.code === kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) { + this._log.debug('rdkafka.revoke', { err }); + + if (!this._processingQueue || this._processingQueue.empty()) { + return this._consumer.unassign(); + } + + this._processingQueue.drain(() => { + this._log.debug('processing queue drained, un-assigning'); + this._consumer.unassign(); + }); + } else { + this._log.error('rdkafka.rebalance', { err, assignment }); + } + }, }; const topicParams = {}; if (this._fromOffset !== undefined) { @@ -206,15 +224,6 @@ class BackbeatConsumer extends EventEmitter { this._consumer.on('warning', warning => this._log.warn('rdkafka.warning', { warning })); this._consumer.on('event.error', err => this._log.error('rdkafka.error', { err })); this._consumer.on('event.throttle', throttle => this._log.info('rdkafka.throttle', { throttle })); - this._consumer.on('rebalance', (err, assignment) => { - if (err.code === kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) { - this._log.debug('rdkafka.assign', { err, assignment }); - } else if (err.code === kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) { - this._log.debug('rdkafka.revoke', { err }); - } else { - this._log.error('rdkafka.rebalance', { err, assignment }); - } - }); this._connect(); }