From a7c9a548ba9cab1619ca36cbf186d036b7c1051a Mon Sep 17 00:00:00 2001 From: Ioannis Kalyvas Date: Thu, 20 Jun 2024 17:15:51 +0300 Subject: [PATCH] Migrate KafkaJS to ^2.0.0 --- package-lock.json | 16 +++++++------- package.json | 4 ++-- src/initializers/kafka/index.ts | 2 +- src/initializers/kafka/kafka.ts | 15 ++++++------- src/typings/kafka.d.ts | 20 +++++++++++++++-- src/utils.ts | 2 +- test/initializers/kafka/kafka.test.ts | 31 ++++++++++++++++----------- 7 files changed, 56 insertions(+), 34 deletions(-) diff --git a/package-lock.json b/package-lock.json index 72049a8a..763887bf 100644 --- a/package-lock.json +++ b/package-lock.json @@ -52,7 +52,7 @@ "dd-trace": "*", "growthbook": "npm:@growthbook/growthbook@*", "husky": "^3.0.9", - "kafkajs": "*", + "kafkajs": "^2.0.0", "lint-staged": "^9.4.3", "mocha": "^9.2.2", "mock-require": "^3.0.3", @@ -4354,12 +4354,12 @@ } }, "node_modules/kafkajs": { - "version": "1.15.0", - "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-1.15.0.tgz", - "integrity": "sha512-yjPyEnQCkPxAuQLIJnY5dI+xnmmgXmhuOQ1GVxClG5KTOV/rJcW1qA3UfvyEJKTp/RTSqQnUR3HJsKFvHyTpNg==", + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", "dev": true, "engines": { - "node": ">=10.13.0" + "node": ">=14.0.0" } }, "node_modules/kareem": { @@ -11991,9 +11991,9 @@ } }, "kafkajs": { - "version": "1.15.0", - "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-1.15.0.tgz", - "integrity": "sha512-yjPyEnQCkPxAuQLIJnY5dI+xnmmgXmhuOQ1GVxClG5KTOV/rJcW1qA3UfvyEJKTp/RTSqQnUR3HJsKFvHyTpNg==", + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", "dev": true }, "kareem": { diff --git a/package.json b/package.json index fefcc96f..8acf34c6 100644 --- a/package.json +++ b/package.json @@ -95,7 +95,7 @@ "dd-trace": "*", "growthbook": "npm:@growthbook/growthbook@*", "husky": "^3.0.9", - "kafkajs": "*", + "kafkajs": "^2.0.0", "lint-staged": "^9.4.3", "mocha": "^9.2.2", "mock-require": "^3.0.3", @@ -120,7 +120,7 @@ "newrelic": "*", "prom-client": "*", "pg": "*", - "kafkajs": "*", + "kafkajs": "^2.0.0", "axios": "*", "growthbook": "*" } diff --git a/src/initializers/kafka/index.ts b/src/initializers/kafka/index.ts index 5467ff71..1c6687ab 100644 --- a/src/initializers/kafka/index.ts +++ b/src/initializers/kafka/index.ts @@ -39,7 +39,7 @@ function updateSend(kafka: Kafka, config: { traceHeaderName: string }) { const traceHeaderName = config.traceHeaderName.toLowerCase(); appendHeadersFromStore(message, getRequestContext(), config); if (!message.key && message.headers[traceHeaderName]) { - message.key = message.headers[traceHeaderName]; + message.key = message.headers[traceHeaderName] as Buffer | string | null; } }); const sent: KafkajsType.RecordMetadata[] = await originalSend.call(this, record); diff --git a/src/initializers/kafka/kafka.ts b/src/initializers/kafka/kafka.ts index d0d89561..85a02ae0 100644 --- a/src/initializers/kafka/kafka.ts +++ b/src/initializers/kafka/kafka.ts @@ -3,8 +3,7 @@ import requireInjected from '../../require-injected'; import { KafkaConfig } from '../../typings/kafka'; import type * as KafkajsType from 'kafkajs'; import { flatten, isEmpty } from 'lodash'; - -const { Kafka }: typeof KafkajsType = requireInjected('kafkajs'); +const { Kafka, Partitioners }: typeof KafkajsType = requireInjected('kafkajs'); const logger = getLogger('orka.kafka'); export default class OrkaKafka { @@ -35,6 +34,7 @@ export default class OrkaKafka { authenticationTimeout }); + if (options && !options.createPartitioner) options.createPartitioner = Partitioners.DefaultPartitioner; this.producer = this.produceClient.producer(options); const { CONNECT, DISCONNECT } = this.producer.events; @@ -109,11 +109,11 @@ export default class OrkaKafka { const renamings = await Promise.all( groupIds .map(async ({ groupId, topic, oldGroupId }) => { - const offsets = await admin.fetchOffsets({ groupId, topic, resolveOffsets: false }); - if (offsets.every(({ offset }) => offset === '-1')) { + const offsets = (await admin.fetchOffsets({ groupId, topics: [topic], resolveOffsets: false }))[0]; + if (offsets.partitions.every(({ offset }) => offset === '-1')) { // groupId is not configured - const oldOffsets = await admin.fetchOffsets({ groupId: oldGroupId, topic, resolveOffsets: false }); - const knownOffsets = oldOffsets?.filter(o => o.offset !== '-1'); + const oldOffsets = (await admin.fetchOffsets({ groupId: oldGroupId, topics: [topic], resolveOffsets: false }))[0]; + const knownOffsets = oldOffsets.partitions.filter(o => o.offset !== '-1'); if (!isEmpty(knownOffsets)) await admin.setOffsets({ groupId, topic, partitions: knownOffsets }); return { groupId, renamedFrom: oldGroupId, topic, oldOffsets: knownOffsets }; } else { @@ -179,7 +179,6 @@ function getAuthOptions(options: { }) { const { key, cert, ca } = options.certificates || {}; if (key && cert && ca) return { ssl: { ...options.certificates, ca: flatten([ca]) } }; - const { username, password } = options.sasl || {}; - if (username && password) return { sasl: options.sasl, ssl: options.ssl }; + if (username && password) return { sasl: {...options.sasl }, ssl: options.ssl }; } diff --git a/src/typings/kafka.d.ts b/src/typings/kafka.d.ts index f66d987b..2599581e 100644 --- a/src/typings/kafka.d.ts +++ b/src/typings/kafka.d.ts @@ -6,7 +6,15 @@ export interface KafkaConfig { rejectUnauthorized: boolean; }; sasl?: { - mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512'; + mechanism: 'plain'; + username: string; + password: string; + } | { + mechanism: 'scram-sha-256'; + username: string; + password: string; + } | { + mechanism: 'scram-sha-512'; username: string; password: string; }; @@ -24,7 +32,15 @@ export interface KafkaConfig { }; ssl?: boolean; sasl?: { - mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512'; + mechanism: 'plain'; + username: string; + password: string; + } | { + mechanism: 'scram-sha-256'; + username: string; + password: string; + } | { + mechanism: 'scram-sha-512'; username: string; password: string; }; diff --git a/src/utils.ts b/src/utils.ts index 4a6a60ff..13a17bb9 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -25,7 +25,7 @@ export const nodeVersionGreaterThanEqual = (requestedVersion: string, version = }; export function appendHeadersFromStore( - properties: { headers?: Record }, + properties: { headers?: { [key: string]: Buffer | string | (Buffer | string)[] | undefined } }, store: Map>, config: Record ) { diff --git a/test/initializers/kafka/kafka.test.ts b/test/initializers/kafka/kafka.test.ts index a31c9457..95f22308 100644 --- a/test/initializers/kafka/kafka.test.ts +++ b/test/initializers/kafka/kafka.test.ts @@ -1,6 +1,8 @@ import * as sinon from 'sinon'; import 'should'; import type KafkaType from '../../../src/initializers/kafka/kafka'; +import { Partitioners } from 'kafkajs'; + const sandbox = sinon.createSandbox(); const mock = require('mock-require'); @@ -50,7 +52,7 @@ describe('kafka class', () => { kafkaStubReturn = { producer: () => producerStub, consumer: consumerStub, admin: adminStub }; kafkaStub = sinon.stub().returns(kafkaStubReturn); delete require.cache[require.resolve('../../../src/initializers/kafka/kafka')]; - mock('kafkajs', { Kafka: kafkaStub }); + mock('kafkajs', { Kafka: kafkaStub, Partitioners }); ({ default: Kafka } = await import('../../../src/initializers/kafka/kafka')); }); @@ -358,14 +360,19 @@ describe('kafka class', () => { }, ssl: true }); - fetchOffsetsStub.onFirstCall().returns([{ partition: 0, offset: '-1' }]); - fetchOffsetsStub.onSecondCall().returns([{ partition: 0, offset: '5' }]); - fetchOffsetsStub.onThirdCall().returns([{ partition: 0, offset: '-1' }]); + fetchOffsetsStub.onFirstCall().returns([{ topic: 'topic', partitions: [{ partition: 0, offset: '-1' }] }]); + fetchOffsetsStub.onSecondCall().returns([{ topic: 'topic2', partitions: [{ partition: 0, offset: '5' }] }]); + fetchOffsetsStub.onThirdCall().returns([{ topic: 'topic3', partitions: [{ partition: 0, offset: '-1' }] }]); + fetchOffsetsStub.onCall(3).returns([ - { partition: 0, offset: '3' }, - { partition: 1, offset: '-1' } + { + partitions: [ + { partition: 0, offset: '3' }, + { partition: 1, offset: '-1' } + ] + } ]); - fetchOffsetsStub.returns([{ partition: 0, offset: '-1' }]); + fetchOffsetsStub.returns([{ partitions: [{ partition: 0, offset: '-1' }] }]); const response = await kafka.renameGroupId([ { groupId: 'newGroupId', topic: 'topic', oldGroupId: 'oldGroupId' }, { groupId: 'newGroupId2', topic: 'topic2', oldGroupId: 'oldGroupId2' }, @@ -374,11 +381,11 @@ describe('kafka class', () => { adminStub.args.should.eql([[]]); fetchOffsetsStub.args.should.eql([ - [{ groupId: 'newGroupId', topic: 'topic', resolveOffsets: false }], - [{ groupId: 'newGroupId2', topic: 'topic2', resolveOffsets: false }], - [{ groupId: 'newGroupId3', topic: 'topic3', resolveOffsets: false }], - [{ groupId: 'oldGroupId', topic: 'topic', resolveOffsets: false }], - [{ groupId: 'oldGroupId3', topic: 'topic3', resolveOffsets: false }] + [{ groupId: 'newGroupId', topics: ['topic'], resolveOffsets: false }], + [{ groupId: 'newGroupId2', topics: ['topic2'], resolveOffsets: false }], + [{ groupId: 'newGroupId3', topics: ['topic3'], resolveOffsets: false }], + [{ groupId: 'oldGroupId', topics: ['topic'], resolveOffsets: false }], + [{ groupId: 'oldGroupId3', topics: ['topic3'], resolveOffsets: false }] ]); setOffsetsStub.args.should.eql([ [{ groupId: 'newGroupId', partitions: [{ offset: '3', partition: 0 }], topic: 'topic' }]