Skip to content

Commit

Permalink
Migrate KafkaJS to ^2.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
kalyvasio committed Jun 26, 2024
1 parent 20657bd commit a7c9a54
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 34 deletions.
16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
"dd-trace": "*",
"growthbook": "npm:@growthbook/growthbook@*",
"husky": "^3.0.9",
"kafkajs": "*",
"kafkajs": "^2.0.0",
"lint-staged": "^9.4.3",
"mocha": "^9.2.2",
"mock-require": "^3.0.3",
Expand All @@ -120,7 +120,7 @@
"newrelic": "*",
"prom-client": "*",
"pg": "*",
"kafkajs": "*",
"kafkajs": "^2.0.0",
"axios": "*",
"growthbook": "*"
}
Expand Down
2 changes: 1 addition & 1 deletion src/initializers/kafka/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ function updateSend(kafka: Kafka, config: { traceHeaderName: string }) {
const traceHeaderName = config.traceHeaderName.toLowerCase();
appendHeadersFromStore(message, getRequestContext(), config);
if (!message.key && message.headers[traceHeaderName]) {
message.key = message.headers[traceHeaderName];
message.key = message.headers[traceHeaderName] as Buffer | string | null;
}
});
const sent: KafkajsType.RecordMetadata[] = await originalSend.call(this, record);
Expand Down
15 changes: 7 additions & 8 deletions src/initializers/kafka/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ import requireInjected from '../../require-injected';
import { KafkaConfig } from '../../typings/kafka';
import type * as KafkajsType from 'kafkajs';
import { flatten, isEmpty } from 'lodash';

const { Kafka }: typeof KafkajsType = requireInjected('kafkajs');
const { Kafka, Partitioners }: typeof KafkajsType = requireInjected('kafkajs');
const logger = getLogger('orka.kafka');

export default class OrkaKafka {
Expand Down Expand Up @@ -35,6 +34,7 @@ export default class OrkaKafka {
authenticationTimeout
});

if (options && !options.createPartitioner) options.createPartitioner = Partitioners.DefaultPartitioner;
this.producer = this.produceClient.producer(options);

const { CONNECT, DISCONNECT } = this.producer.events;
Expand Down Expand Up @@ -109,11 +109,11 @@ export default class OrkaKafka {
const renamings = await Promise.all(
groupIds
.map(async ({ groupId, topic, oldGroupId }) => {
const offsets = await admin.fetchOffsets({ groupId, topic, resolveOffsets: false });
if (offsets.every(({ offset }) => offset === '-1')) {
const offsets = (await admin.fetchOffsets({ groupId, topics: [topic], resolveOffsets: false }))[0];
if (offsets.partitions.every(({ offset }) => offset === '-1')) {
// groupId is not configured
const oldOffsets = await admin.fetchOffsets({ groupId: oldGroupId, topic, resolveOffsets: false });
const knownOffsets = oldOffsets?.filter(o => o.offset !== '-1');
const oldOffsets = (await admin.fetchOffsets({ groupId: oldGroupId, topics: [topic], resolveOffsets: false }))[0];
const knownOffsets = oldOffsets.partitions.filter(o => o.offset !== '-1');
if (!isEmpty(knownOffsets)) await admin.setOffsets({ groupId, topic, partitions: knownOffsets });
return { groupId, renamedFrom: oldGroupId, topic, oldOffsets: knownOffsets };
} else {
Expand Down Expand Up @@ -179,7 +179,6 @@ function getAuthOptions(options: {
}) {
const { key, cert, ca } = options.certificates || {};
if (key && cert && ca) return { ssl: { ...options.certificates, ca: flatten([ca]) } };

const { username, password } = options.sasl || {};
if (username && password) return { sasl: options.sasl, ssl: options.ssl };
if (username && password) return { sasl: {...options.sasl }, ssl: options.ssl };
}
20 changes: 18 additions & 2 deletions src/typings/kafka.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@ export interface KafkaConfig {
rejectUnauthorized: boolean;
};
sasl?: {
mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512';
mechanism: 'plain';
username: string;
password: string;
} | {
mechanism: 'scram-sha-256';
username: string;
password: string;
} | {
mechanism: 'scram-sha-512';
username: string;
password: string;
};
Expand All @@ -24,7 +32,15 @@ export interface KafkaConfig {
};
ssl?: boolean;
sasl?: {
mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512';
mechanism: 'plain';
username: string;
password: string;
} | {
mechanism: 'scram-sha-256';
username: string;
password: string;
} | {
mechanism: 'scram-sha-512';
username: string;
password: string;
};
Expand Down
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export const nodeVersionGreaterThanEqual = (requestedVersion: string, version =
};

export function appendHeadersFromStore(
properties: { headers?: Record<string, string | Buffer | undefined> },
properties: { headers?: { [key: string]: Buffer | string | (Buffer | string)[] | undefined } },
store: Map<string, string | Record<string, string>>,
config: Record<string, any>
) {
Expand Down
31 changes: 19 additions & 12 deletions test/initializers/kafka/kafka.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import * as sinon from 'sinon';
import 'should';
import type KafkaType from '../../../src/initializers/kafka/kafka';
import { Partitioners } from 'kafkajs';

const sandbox = sinon.createSandbox();
const mock = require('mock-require');

Expand Down Expand Up @@ -50,7 +52,7 @@ describe('kafka class', () => {
kafkaStubReturn = { producer: () => producerStub, consumer: consumerStub, admin: adminStub };
kafkaStub = sinon.stub().returns(kafkaStubReturn);
delete require.cache[require.resolve('../../../src/initializers/kafka/kafka')];
mock('kafkajs', { Kafka: kafkaStub });
mock('kafkajs', { Kafka: kafkaStub, Partitioners });
({ default: Kafka } = await import('../../../src/initializers/kafka/kafka'));
});

Expand Down Expand Up @@ -358,14 +360,19 @@ describe('kafka class', () => {
},
ssl: true
});
fetchOffsetsStub.onFirstCall().returns([{ partition: 0, offset: '-1' }]);
fetchOffsetsStub.onSecondCall().returns([{ partition: 0, offset: '5' }]);
fetchOffsetsStub.onThirdCall().returns([{ partition: 0, offset: '-1' }]);
fetchOffsetsStub.onFirstCall().returns([{ topic: 'topic', partitions: [{ partition: 0, offset: '-1' }] }]);
fetchOffsetsStub.onSecondCall().returns([{ topic: 'topic2', partitions: [{ partition: 0, offset: '5' }] }]);
fetchOffsetsStub.onThirdCall().returns([{ topic: 'topic3', partitions: [{ partition: 0, offset: '-1' }] }]);

fetchOffsetsStub.onCall(3).returns([
{ partition: 0, offset: '3' },
{ partition: 1, offset: '-1' }
{
partitions: [
{ partition: 0, offset: '3' },
{ partition: 1, offset: '-1' }
]
}
]);
fetchOffsetsStub.returns([{ partition: 0, offset: '-1' }]);
fetchOffsetsStub.returns([{ partitions: [{ partition: 0, offset: '-1' }] }]);
const response = await kafka.renameGroupId([
{ groupId: 'newGroupId', topic: 'topic', oldGroupId: 'oldGroupId' },
{ groupId: 'newGroupId2', topic: 'topic2', oldGroupId: 'oldGroupId2' },
Expand All @@ -374,11 +381,11 @@ describe('kafka class', () => {

adminStub.args.should.eql([[]]);
fetchOffsetsStub.args.should.eql([
[{ groupId: 'newGroupId', topic: 'topic', resolveOffsets: false }],
[{ groupId: 'newGroupId2', topic: 'topic2', resolveOffsets: false }],
[{ groupId: 'newGroupId3', topic: 'topic3', resolveOffsets: false }],
[{ groupId: 'oldGroupId', topic: 'topic', resolveOffsets: false }],
[{ groupId: 'oldGroupId3', topic: 'topic3', resolveOffsets: false }]
[{ groupId: 'newGroupId', topics: ['topic'], resolveOffsets: false }],
[{ groupId: 'newGroupId2', topics: ['topic2'], resolveOffsets: false }],
[{ groupId: 'newGroupId3', topics: ['topic3'], resolveOffsets: false }],
[{ groupId: 'oldGroupId', topics: ['topic'], resolveOffsets: false }],
[{ groupId: 'oldGroupId3', topics: ['topic3'], resolveOffsets: false }]
]);
setOffsetsStub.args.should.eql([
[{ groupId: 'newGroupId', partitions: [{ offset: '3', partition: 0 }], topic: 'topic' }]
Expand Down

0 comments on commit a7c9a54

Please sign in to comment.