Skip to content

Commit

Permalink
allow passing compressionType and requiredAcks for Kafka notification…
Browse files Browse the repository at this point in the history
… destinations

Issue: BB-620
  • Loading branch information
Kerkesni committed Nov 7, 2024
1 parent 6694ca8 commit 0bb00c4
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 5 deletions.
10 changes: 10 additions & 0 deletions extensions/notification/NotificationConfigValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ const destinationSchema = joi.object({
internalTopic: joi.string(),
topic: joi.string().required(),
auth: authSchema.default({}),
requiredAcks: joi.number().when('type', {
is: joi.string().not('kafka'),
then: joi.forbidden(),
otherwise: joi.number().default(1),
}),
compressionType: joi.string().when('type', {
is: joi.string().not('kafka'),
then: joi.forbidden(),
otherwise: joi.string().default('none'),
}),
});

const joiSchema = joi.object({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ class KafkaNotificationDestination extends NotificationDestination {
}

_setupProducer(done) {
const { host, port, topic, pollIntervalMs, auth } = this._destinationConfig;
const { host, port, topic, pollIntervalMs, auth, requiredAcks, compressionType } = this._destinationConfig;
const producer = new KafkaProducer({
kafka: { hosts: `${host}:${port}` },
topic,
pollIntervalMs,
auth,
compressionType,
requiredAcks,
});
producer.once('error', done);
producer.once('ready', () => {
Expand Down
4 changes: 0 additions & 4 deletions extensions/notification/destination/KafkaProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ class KafkaProducer extends BackbeatProducer {
return 'NotificationProducer';
}

getRequireAcks() {
return 1;
}

setFromConfig(joiResult) {
super.setFromConfig(joiResult);
this._auth = joiResult.auth ? authUtil.generateKafkaAuthObject(joiResult.auth) : {};
Expand Down
81 changes: 81 additions & 0 deletions tests/unit/notification/NotificationConfigValidator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
const assert = require('assert');

const configValidator = require('../../../extensions/notification/NotificationConfigValidator');

describe('NotificationConfigValidator ::', () => {
it('should throw an error if requiredAcks is specified for a non-kafka destination', () => {
const extConfig = {
topic: 'topic',
monitorNotificationFailures: true,
notificationFailedTopic: 'failed-topic',
queueProcessor: {
groupId: 'groupId',
concurrency: 1000,
},
destinations: [{
resource: 'resource',
type: 'other',
host: 'host',
port: 8000,
topic: 'topic',
requiredAcks: 1,
}],
probeServer: {
bindAddress: 'localhost',
port: 8000,
},
};
assert.throws(() => configValidator(null, extConfig));
});

it('should throw an error if compressionType is specified for a non-kafka destination', () => {
const extConfig = {
topic: 'topic',
monitorNotificationFailures: true,
notificationFailedTopic: 'failed-topic',
queueProcessor: {
groupId: 'groupId',
concurrency: 1000,
},
destinations: [{
resource: 'resource',
type: 'other',
host: 'host',
port: 8000,
topic: 'topic',
compressionType: 'none',
}],
probeServer: {
bindAddress: 'localhost',
port: 8000,
},
};
assert.throws(() => configValidator(null, extConfig));
});

it('should not throw an error if requiredAcks and compressionType is specified for a kafka destination', () => {
const extConfig = {
topic: 'topic',
monitorNotificationFailures: true,
notificationFailedTopic: 'failed-topic',
queueProcessor: {
groupId: 'groupId',
concurrency: 1000,
},
destinations: [{
resource: 'resource',
type: 'kafka',
host: 'host',
port: 8000,
topic: 'topic',
requiredAcks: 1,
compressionType: 'none',
}],
probeServer: {
bindAddress: 'localhost',
port: 8000,
},
};
assert.doesNotThrow(() => configValidator(null, extConfig));
});
});

0 comments on commit 0bb00c4

Please sign in to comment.