Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate KafkaJS to ^2.0.0 #384

Merged
merged 5 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,4 @@ ObjectId using new mongoose.
- Changes in default config: istioTraceHeaders and headerPropagation are deprecated in favor of a generic propagatedHeaders
- Methods kafka.send that was deprecated is removed. Use kafka.producer.send instead
- Mongoose migrated to v7 which has breaking changes see [here](https://mongoosejs.com/docs/7.x/docs/migrating_to_7.html)
- KafkaJS version is specified to 2.x.x. Additional information about breaking changes when migrating to KafkaJS 2.x.x can be found at https://kafka.js.org/docs/migration-guide-v2.0.0
21 changes: 21 additions & 0 deletions docs/integrations/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,24 @@ You also need to specify the new group ids in your consumers while previously yo
Once the new group ids are created you can remove the code that copies their offsets from old group ids. This is causing no issues thought as it doesn't do anything if the new group ids are found with offsets set.

If you are using the renameGroupIds (before creating your consumers) your consumers will continue reading messages from the offset specified from the old groupId regardless if you set the fromBeginning configuration. FromBeginning configuration will be used if the groupId, topic is not found in kafka.

## Migrating from orka < 4.x.x

Since Orka 5.x.x, the required KafkaJS version is ^2.x.x
(more info on the KafkaJS changes [here](https://kafka.js.org/docs/migration-guide-v2.0.0))

The only possible implication is the update of the DefaultPartitioner (https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner). This may case
issues in partition-aware environments, since some messages may be produced in different partitions than they would with the previous default partitioner.

By default, Orka ^4.x.x will use the new DefaultPartitioner (previously named JavaCompatiblePartitioner).
You can use the previous DefaultPartitioner (now renamed to LegacyPartitioner) with:

```js
import { Partitioners } from 'kafkajs';

builder({…some static options here…})
...
.withKafka({ createPartitioner: Partitioners.LegacyPartitioner })
.start(8080);
```

16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
"dd-trace": "*",
"growthbook": "npm:@growthbook/growthbook@*",
"husky": "^3.0.9",
"kafkajs": "*",
"kafkajs": "^2.0.0",
"lint-staged": "^9.4.3",
"mocha": "^9.2.2",
"mock-require": "^3.0.3",
Expand All @@ -120,7 +120,7 @@
"newrelic": "*",
"prom-client": "*",
"pg": "*",
"kafkajs": "*",
"kafkajs": "^2.0.0",
"axios": "*",
"growthbook": "*"
}
Expand Down
2 changes: 1 addition & 1 deletion src/initializers/kafka/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ function updateSend(kafka: Kafka, config: { traceHeaderName: string }) {
const traceHeaderName = config.traceHeaderName.toLowerCase();
appendHeadersFromStore(message, getRequestContext(), config);
if (!message.key && message.headers[traceHeaderName]) {
message.key = message.headers[traceHeaderName];
message.key = message.headers[traceHeaderName] as Buffer | string | null;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't accept number keys too?

Suggested change
message.key = message.headers[traceHeaderName] as Buffer | string | null;
message.key = message.headers[traceHeaderName] as Buffer | string | number | null;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm these are the only values accepted by kafkajs typings for the message key.
I believe that all values are either treated as string or binary data.
Do you have a specific case you want me to take another look?

}
});
const sent: KafkajsType.RecordMetadata[] = await originalSend.call(this, record);
Expand Down
13 changes: 6 additions & 7 deletions src/initializers/kafka/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ import requireInjected from '../../require-injected';
import { KafkaConfig } from '../../typings/kafka';
import type * as KafkajsType from 'kafkajs';
import { flatten, isEmpty } from 'lodash';

const { Kafka }: typeof KafkajsType = requireInjected('kafkajs');
const { Kafka, Partitioners }: typeof KafkajsType = requireInjected('kafkajs');
const logger = getLogger('orka.kafka');

export default class OrkaKafka {
Expand Down Expand Up @@ -35,6 +34,7 @@ export default class OrkaKafka {
authenticationTimeout
});

if (options && !options.createPartitioner) options.createPartitioner = Partitioners.DefaultPartitioner;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the ability to define the partitioner in docs too?
https://github.com/Workable/orka/blob/master/docs/integrations/kafka.md

this.producer = this.produceClient.producer(options);

const { CONNECT, DISCONNECT } = this.producer.events;
Expand Down Expand Up @@ -109,11 +109,11 @@ export default class OrkaKafka {
const renamings = await Promise.all(
groupIds
.map(async ({ groupId, topic, oldGroupId }) => {
const offsets = await admin.fetchOffsets({ groupId, topic, resolveOffsets: false });
if (offsets.every(({ offset }) => offset === '-1')) {
const [offsets] = await admin.fetchOffsets({ groupId, topics: [topic], resolveOffsets: false });
if (offsets.partitions.every(({ offset }) => offset === '-1')) {
// groupId is not configured
const oldOffsets = await admin.fetchOffsets({ groupId: oldGroupId, topic, resolveOffsets: false });
const knownOffsets = oldOffsets?.filter(o => o.offset !== '-1');
const [oldOffsets] = (await admin.fetchOffsets({ groupId: oldGroupId, topics: [topic], resolveOffsets: false }));
const knownOffsets = oldOffsets.partitions.filter(o => o.offset !== '-1');
if (!isEmpty(knownOffsets)) await admin.setOffsets({ groupId, topic, partitions: knownOffsets });
return { groupId, renamedFrom: oldGroupId, topic, oldOffsets: knownOffsets };
} else {
Expand Down Expand Up @@ -179,7 +179,6 @@ function getAuthOptions(options: {
}) {
const { key, cert, ca } = options.certificates || {};
if (key && cert && ca) return { ssl: { ...options.certificates, ca: flatten([ca]) } };

const { username, password } = options.sasl || {};
if (username && password) return { sasl: options.sasl, ssl: options.ssl };
}
20 changes: 18 additions & 2 deletions src/typings/kafka.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@ export interface KafkaConfig {
rejectUnauthorized: boolean;
};
sasl?: {
mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512';
mechanism: 'plain';
username: string;
password: string;
} | {
mechanism: 'scram-sha-256';
username: string;
password: string;
} | {
mechanism: 'scram-sha-512';
nikostoulas marked this conversation as resolved.
Show resolved Hide resolved
username: string;
password: string;
};
Expand All @@ -24,7 +32,15 @@ export interface KafkaConfig {
};
ssl?: boolean;
sasl?: {
mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512';
mechanism: 'plain';
username: string;
password: string;
} | {
mechanism: 'scram-sha-256';
username: string;
password: string;
} | {
mechanism: 'scram-sha-512';
username: string;
password: string;
};
Expand Down
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export const nodeVersionGreaterThanEqual = (requestedVersion: string, version =
};

export function appendHeadersFromStore(
properties: { headers?: Record<string, string | Buffer | undefined> },
properties: { headers?: Record<string, string | Buffer | (Buffer | string)[] | undefined> },
store: Map<string, string | Record<string, string>>,
config: Record<string, any>
) {
Expand Down
31 changes: 19 additions & 12 deletions test/initializers/kafka/kafka.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import * as sinon from 'sinon';
import 'should';
import type KafkaType from '../../../src/initializers/kafka/kafka';
import { Partitioners } from 'kafkajs';

const sandbox = sinon.createSandbox();
const mock = require('mock-require');

Expand Down Expand Up @@ -50,7 +52,7 @@ describe('kafka class', () => {
kafkaStubReturn = { producer: () => producerStub, consumer: consumerStub, admin: adminStub };
kafkaStub = sinon.stub().returns(kafkaStubReturn);
delete require.cache[require.resolve('../../../src/initializers/kafka/kafka')];
mock('kafkajs', { Kafka: kafkaStub });
mock('kafkajs', { Kafka: kafkaStub, Partitioners });
({ default: Kafka } = await import('../../../src/initializers/kafka/kafka'));
});

Expand Down Expand Up @@ -358,14 +360,19 @@ describe('kafka class', () => {
},
ssl: true
});
fetchOffsetsStub.onFirstCall().returns([{ partition: 0, offset: '-1' }]);
fetchOffsetsStub.onSecondCall().returns([{ partition: 0, offset: '5' }]);
fetchOffsetsStub.onThirdCall().returns([{ partition: 0, offset: '-1' }]);
fetchOffsetsStub.onFirstCall().returns([{ topic: 'topic', partitions: [{ partition: 0, offset: '-1' }] }]);
fetchOffsetsStub.onSecondCall().returns([{ topic: 'topic2', partitions: [{ partition: 0, offset: '5' }] }]);
fetchOffsetsStub.onThirdCall().returns([{ topic: 'topic3', partitions: [{ partition: 0, offset: '-1' }] }]);

fetchOffsetsStub.onCall(3).returns([
{ partition: 0, offset: '3' },
{ partition: 1, offset: '-1' }
{
partitions: [
{ partition: 0, offset: '3' },
{ partition: 1, offset: '-1' }
]
}
]);
fetchOffsetsStub.returns([{ partition: 0, offset: '-1' }]);
fetchOffsetsStub.returns([{ partitions: [{ partition: 0, offset: '-1' }] }]);
const response = await kafka.renameGroupId([
{ groupId: 'newGroupId', topic: 'topic', oldGroupId: 'oldGroupId' },
{ groupId: 'newGroupId2', topic: 'topic2', oldGroupId: 'oldGroupId2' },
Expand All @@ -374,11 +381,11 @@ describe('kafka class', () => {

adminStub.args.should.eql([[]]);
fetchOffsetsStub.args.should.eql([
[{ groupId: 'newGroupId', topic: 'topic', resolveOffsets: false }],
[{ groupId: 'newGroupId2', topic: 'topic2', resolveOffsets: false }],
[{ groupId: 'newGroupId3', topic: 'topic3', resolveOffsets: false }],
[{ groupId: 'oldGroupId', topic: 'topic', resolveOffsets: false }],
[{ groupId: 'oldGroupId3', topic: 'topic3', resolveOffsets: false }]
[{ groupId: 'newGroupId', topics: ['topic'], resolveOffsets: false }],
[{ groupId: 'newGroupId2', topics: ['topic2'], resolveOffsets: false }],
[{ groupId: 'newGroupId3', topics: ['topic3'], resolveOffsets: false }],
[{ groupId: 'oldGroupId', topics: ['topic'], resolveOffsets: false }],
[{ groupId: 'oldGroupId3', topics: ['topic3'], resolveOffsets: false }]
]);
setOffsetsStub.args.should.eql([
[{ groupId: 'newGroupId', partitions: [{ offset: '3', partition: 0 }], topic: 'topic' }]
Expand Down
Loading