-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Migrate KafkaJS to ^2.0.0 #384
Conversation
83d44f1
to
a7c9a54
Compare
@@ -35,6 +34,7 @@ export default class OrkaKafka { | |||
authenticationTimeout | |||
}); | |||
|
|||
if (options && !options.createPartitioner) options.createPartitioner = Partitioners.DefaultPartitioner; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add the ability to define the partitioner in docs too?
https://github.com/Workable/orka/blob/master/docs/integrations/kafka.md
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚀
@kalyvasio Can we please also add the breaking changes here: https://workable.github.io/orka/#migrating-from-orka--3x-to-4x? |
src/initializers/kafka/kafka.ts
Outdated
const { username, password } = options.sasl || {}; | ||
if (username && password) return { sasl: options.sasl, ssl: options.ssl }; | ||
if (username && password) return { sasl: {...options.sasl }, ssl: options.ssl }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ Was this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it was a leftover from a reverted refactoring...Fixed in e1bc8c0
src/utils.ts
Outdated
@@ -25,7 +25,7 @@ export const nodeVersionGreaterThanEqual = (requestedVersion: string, version = | |||
}; | |||
|
|||
export function appendHeadersFromStore( | |||
properties: { headers?: Record<string, string | Buffer | undefined> }, | |||
properties: { headers?: { [key: string]: Buffer | string | (Buffer | string)[] | undefined } }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ Was the previous complaining?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes but apparently for a different reason
Addressed here c057664
bf4b326
to
e1bc8c0
Compare
@@ -39,7 +39,7 @@ function updateSend(kafka: Kafka, config: { traceHeaderName: string }) { | |||
const traceHeaderName = config.traceHeaderName.toLowerCase(); | |||
appendHeadersFromStore(message, getRequestContext(), config); | |||
if (!message.key && message.headers[traceHeaderName]) { | |||
message.key = message.headers[traceHeaderName]; | |||
message.key = message.headers[traceHeaderName] as Buffer | string | null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't accept number
keys too?
message.key = message.headers[traceHeaderName] as Buffer | string | null; | |
message.key = message.headers[traceHeaderName] as Buffer | string | number | null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm these are the only values accepted by kafkajs typings for the message key.
I believe that all values are either treated as string or binary data.
Do you have a specific case you want me to take another look?
FYI here Line 77 in 915d882
|
ae2f079
to
c057664
Compare
I have updated the docs and I have added the usage for the Default Partitioner in the Kafka md in this commit |
docs/index.md
Outdated
@@ -75,3 +75,8 @@ ObjectId using new mongoose. | |||
- Changes in default config: istioTraceHeaders and headerPropagation are deprecated in favor of a generic propagatedHeaders | |||
- Methods kafka.send that was deprecated is removed. Use kafka.producer.send instead | |||
- Mongoose migrated to v7 which has breaking changes see [here](https://mongoosejs.com/docs/7.x/docs/migrating_to_7.html) | |||
|
|||
### Migrating from orka 4.x to 5.x |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kalyvasio this is still migrating from 3.x to 4.x as we haven't gone out of beta yet. Use one line for the kafka changes
edf800b
to
5da9d39
Compare
Co-authored-by: Kyriakos Lesgidis <[email protected]>
Co-authored-by: Nikos Kostoulas <[email protected]>
5da9d39
to
dbf6cd5
Compare
Bumps KafkaJS to 2.0.0
It contains two major changes that affect us
Manage topics through admin client
KafkaJS contains some non-backwards compatible options in regards to managing topics via the admin client
We have changed the implementation to support the new array handling of topic and offsets
Default partitioner
Another change is the update of the DefaultPartitioner
By default the JavaCompatiblePartitioner is used but if not explicitly set it will log a warning
We are setting it by default (if not already set) here
If for some reason someone needs to use the old partitioner (LegacyPartitioner), it can be set through the
.withKafkaOptions()
e.g
.withKafka({ createPartitioner: Partitioners.LegacyPartitioner })