Skip to content

Commit

Permalink
simulate extra long processing
Browse files Browse the repository at this point in the history
  • Loading branch information
rachedbenmustapha committed Oct 6, 2023
1 parent 2ad5256 commit cd85a1c
Showing 1 changed file with 49 additions and 2 deletions.
51 changes: 49 additions & 2 deletions lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const CONCURRENCY_DEFAULT = 1;
const CLIENT_ID = 'BackbeatConsumer';
const { withTopicPrefix } = require('./util/topic');

const MAX_POLL_INTERVAL_MS = 6000; // higher than broker's group.min.session.timeout.ms

/**
* Stats on how we are consuming Kafka
* @typedef {Object} ConsumerStats
Expand Down Expand Up @@ -132,6 +134,8 @@ class BackbeatConsumer extends EventEmitter {
this._publishOffsetsCronActive = false;
this._consumedEventTimeout = null;

this._lastConsumed = new Date();

/** @type {ConsumerStats} */
this.consumerStats = { lag: {} };

Expand Down Expand Up @@ -179,6 +183,8 @@ class BackbeatConsumer extends EventEmitter {
'offset_commit_cb': this._onOffsetCommit.bind(this),
// automatically create topic
'allow.auto.create.topics': true,
'max.poll.interval.ms': MAX_POLL_INTERVAL_MS,
'session.timeout.ms': MAX_POLL_INTERVAL_MS,
};
const topicParams = {};
if (this._fromOffset !== undefined) {
Expand Down Expand Up @@ -250,6 +256,37 @@ class BackbeatConsumer extends EventEmitter {
setInterval(this._publishOffsetsCron.bind(this),
this._kafkaBacklogMetricsConfig.intervalS * 1000);
}

if (!process.env.CONSUME_TO_MAINTAIN_MEMBERSHIP) {
this._pingInterval = null;
return;
}

this._pingInterval = setInterval(() => {
const lastCommitAge = new Date() - this._lastConsumed;

if (lastCommitAge >= MAX_POLL_INTERVAL_MS/2) {

Check failure on line 268 in lib/BackbeatConsumer.js

View workflow job for this annotation

GitHub Actions / tests

Operator '/' must be spaced
this._consumer.getMetadata({ topic: this._topic }, (err, md) => {
if (err) {
return;
}

const partitions = md.topics
.filter(t => t.name === this._topic)
.map(t => t.partitions)
.flatMap(p => p.map(pp => ({
topic: this._topic,
partition: pp.id,
})));

this._consumer.pause(partitions);
this._consumer.consume(1, (err, data) => {
console.log('CONSUME_TO_MAINTAIN_MEMBERSHIP', { err, data, partitions })

Check failure on line 284 in lib/BackbeatConsumer.js

View workflow job for this annotation

GitHub Actions / tests

Unexpected console statement

Check failure on line 284 in lib/BackbeatConsumer.js

View workflow job for this annotation

GitHub Actions / tests

Missing semicolon
this._consumer.resume(partitions);
});
});
}
}, MAX_POLL_INTERVAL_MS/2);

Check failure on line 289 in lib/BackbeatConsumer.js

View workflow job for this annotation

GitHub Actions / tests

Operator '/' must be spaced
}

isReady() {
Expand Down Expand Up @@ -389,19 +426,24 @@ class BackbeatConsumer extends EventEmitter {
entry.topic, entry.partition, entry.offset);
this._messagesConsumed++;
this._processingQueue.push(entry, (err, completionArgs) => {
this._onEntryProcessingDone(err, entry, completionArgs);
setTimeout(() => {
console.log('CONSUME_TO_MAINTAIN_MEMBERSHIP adding delay')

Check failure on line 430 in lib/BackbeatConsumer.js

View workflow job for this annotation

GitHub Actions / tests

Unexpected console statement

Check failure on line 430 in lib/BackbeatConsumer.js

View workflow job for this annotation

GitHub Actions / tests

Missing semicolon
this._onEntryProcessingDone(err, entry, completionArgs);
this._lastConsumed = new Date();
}, MAX_POLL_INTERVAL_MS * 10);
});
// update Zenko metrics with the latest consumed
// message timestamp, to later allow computing
// backlog metrics on demand
KafkaBacklogMetrics.onMessageConsumed(
entry.topic, entry.partition, this._groupId,
entry.timestamp / 1000);

return undefined;
});
}
if (err || entries.length === 0) {
this._log.debug(err ? `error, retry in 1s: ${err.message}` :
this._log.debug(err ? `error ${err.code} retry in 1s: ${err.message}` :
'no message is available yet, retry in 1s',
{ topic: this._topic, groupId: this._groupId });
this._scheduleNextTryConsume();
Expand Down Expand Up @@ -748,6 +790,11 @@ class BackbeatConsumer extends EventEmitter {
return setTimeout(() => this.close(cb), 1000);
}
this._circuitBreaker.stop();

if (this._pingInterval) {
clearInterval(this._pingInterval);
}

return async.series([
next => {
if (this._consumer) {
Expand Down

0 comments on commit cd85a1c

Please sign in to comment.