-
Notifications
You must be signed in to change notification settings - Fork 139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure calls to Producer.produce* do not block #581
Conversation
Records to be produced are placed in Chunks in a bounded ZIO Queue, which is processed by a stream running on the Blocking thread pool. The result is communicated back via a Promise. This way we can be sure that calls to `Producer.produce*` never block the ZIO thread pool without incurring the overhead of shifting to the blocking thread pool for every record or chunk of records.
producer.close | ||
} | ||
) | ||
runtime <- ZIO.runtime[Any] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
zio-kafka/src/main/scala/zio/kafka/producer/ProducerSettings.scala
Outdated
Show resolved
Hide resolved
…cala Co-authored-by: Jules Ivanic <[email protected]>
LGTM We really need benchmarks to see if what we're doing really improve things or not |
) | ||
} | ||
} | ||
serializedRecords <- ZIO.foreach(records)(serialize(_, keySerializer, valueSerializer)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if it'd be possible to be lazier here and to serialize each event independently only just before sending it, instead of serializing them all and then sending them all 🤔
Well, I made a POC of this idea here: #582
I'd like to get your opinion on it
(I also made an interruptible version of this POC here: #583)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested the performance of my proposal and it's actually way slower. See #531 (comment)
The interruptible version doesn't even work (yet)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made the interruptible version work but it's super slow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if it'd be possible to be lazier here and to serialize each event independently only just before sending it, instead of serializing them all and then sending them all 🤔
Conclusion: Doesn't seem to be possible without a massive loss of performances. So it's better to keep it this way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Always good to experiment :) I do know that provide
or provideEnvironment
calls have some overhead, which is why it's recommended to do it at the outer edges of your ZIO workflows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do know that provide or provideEnvironment calls have some overhead
Yeah they def do have some overhead
I ran those benchmarks myself as well, did not see any noticeable slowdown. Thanks for the review and improvements! |
@guizmaii Looks like I need an approval again after the latest changes. The merge rules for this repo are quite strict it appears 😅 |
@svroonland You're good to go :) |
Records to be produced are placed in Chunks in a bounded ZIO Queue, which is processed by a stream running on the Blocking thread pool. The result is communicated back via a Promise. This way we can be sure that calls to
Producer.produce*
never block the ZIO thread pool without incurring the overhead of shifting to the blocking thread pool for every record or chunk of records.Also