diff --git a/lib/BackbeatConsumer.js b/lib/BackbeatConsumer.js index 89a2d7440..27273a6e2 100644 --- a/lib/BackbeatConsumer.js +++ b/lib/BackbeatConsumer.js @@ -18,6 +18,8 @@ const CONCURRENCY_DEFAULT = 1; const CLIENT_ID = 'BackbeatConsumer'; const { withTopicPrefix } = require('./util/topic'); +const MAX_POLL_INTERVAL_MS = 6000; // higher than broker's group.min.session.timeout.ms + /** * Stats on how we are consuming Kafka * @typedef {Object} ConsumerStats @@ -132,6 +134,8 @@ class BackbeatConsumer extends EventEmitter { this._publishOffsetsCronActive = false; this._consumedEventTimeout = null; + this._lastConsumed = new Date(); + /** @type {ConsumerStats} */ this.consumerStats = { lag: {} }; @@ -179,6 +183,8 @@ class BackbeatConsumer extends EventEmitter { 'offset_commit_cb': this._onOffsetCommit.bind(this), // automatically create topic 'allow.auto.create.topics': true, + 'max.poll.interval.ms': MAX_POLL_INTERVAL_MS, + 'session.timeout.ms': MAX_POLL_INTERVAL_MS, }; const topicParams = {}; if (this._fromOffset !== undefined) { @@ -250,6 +256,37 @@ class BackbeatConsumer extends EventEmitter { setInterval(this._publishOffsetsCron.bind(this), this._kafkaBacklogMetricsConfig.intervalS * 1000); } + + if (!process.env.CONSUME_TO_MAINTAIN_MEMBERSHIP) { + this._pingInterval = null; + return; + } + + this._pingInterval = setInterval(() => { + const lastCommitAge = new Date() - this._lastConsumed; + + if (lastCommitAge >= MAX_POLL_INTERVAL_MS/2) { + this._consumer.getMetadata({ topic: this._topic }, (err, md) => { + if (err) { + return; + } + + const partitions = md.topics + .filter(t => t.name === this._topic) + .map(t => t.partitions) + .flatMap(p => p.map(pp => ({ + topic: this._topic, + partition: pp.id, + }))); + + this._consumer.pause(partitions); + this._consumer.consume(1, (err, data) => { + console.log('CONSUME_TO_MAINTAIN_MEMBERSHIP', { err, data, partitions }) + this._consumer.resume(partitions); + }); + }); + } + }, MAX_POLL_INTERVAL_MS/2); } isReady() { @@ -389,7 +426,11 @@ class BackbeatConsumer extends EventEmitter { entry.topic, entry.partition, entry.offset); this._messagesConsumed++; this._processingQueue.push(entry, (err, completionArgs) => { - this._onEntryProcessingDone(err, entry, completionArgs); + setTimeout(() => { + console.log('CONSUME_TO_MAINTAIN_MEMBERSHIP adding delay') + this._onEntryProcessingDone(err, entry, completionArgs); + this._lastConsumed = new Date(); + }, MAX_POLL_INTERVAL_MS * 10); }); // update Zenko metrics with the latest consumed // message timestamp, to later allow computing @@ -397,11 +438,12 @@ class BackbeatConsumer extends EventEmitter { KafkaBacklogMetrics.onMessageConsumed( entry.topic, entry.partition, this._groupId, entry.timestamp / 1000); + return undefined; }); } if (err || entries.length === 0) { - this._log.debug(err ? `error, retry in 1s: ${err.message}` : + this._log.debug(err ? `error ${err.code} retry in 1s: ${err.message}` : 'no message is available yet, retry in 1s', { topic: this._topic, groupId: this._groupId }); this._scheduleNextTryConsume(); @@ -748,6 +790,11 @@ class BackbeatConsumer extends EventEmitter { return setTimeout(() => this.close(cb), 1000); } this._circuitBreaker.stop(); + + if (this._pingInterval) { + clearInterval(this._pingInterval); + } + return async.series([ next => { if (this._consumer) {