-
Notifications
You must be signed in to change notification settings - Fork 83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lost records on frequent commits and pause/resume #118
Comments
@ppatierno ping |
Just a clarification on the last part concerning the exception, as reading it back made me realize it was expressed pretty inaccurately. The excpetion was a result of the following snipped:
As soon as the timer fires and completes the future, the exception occures (for every call and as stated, no matter the timout value). |
can you provide tests for this ? |
Here's a quick test, representing a stripped down version of our current setup.
EDIT: updated test code |
sorry to be late guys, I am taking a look at it right now |
@Seraksab I also read ...
It seems to be a different bug. Did you notice this one in your use case only or in general? |
@Seraksab the example is not usable as it is because it has some classes that I guess come from your application (GenericData, Kafka.ValueHandler, ...) but I will try to change it a little bit for reproducing the issue. |
@Seraksab I changed my mind, I think it's better having from you something that we can actually running in a simpler way. I would remove the schema registry dependency as well, using messages with key and values as strings. Is it something you can provide? |
So far I only noticed this in this specific use case.
Sorry, my fault. I just quickly stripped down our internal test and seem to have missed a few classes. I have updated my previously posted example to avoid spamming the comments with code. |
just a quick follow up on the commit handler not beeing called. In my case this always runs into the set timeout without the specified completionHandler being called. However, removing the 'pause()' call seems to resolve this issue and the handler is called as expected. Is this behaviour intended this way, as i cant' find it in the documentation?
|
Hi
We are currently experiencing lost records from kafka that are not delivered to the registered data handler.
We did some further investigating and testing and there probably seems to be an issue on how commits are handled.
Our general procedure looks the following:
The consumer is subscribed to one topic only.
If we work our way through a topic with tens of thousands of records, the first few iterations always succeed but at some point records are skipped and not deliverd to the registred data handler (hundreds in some cases!).
Simply adding a delay of 500ms after calling commit seems to fix the problem and we are receiving all of the records correctly.
In a further attempt we tried replacing the added delay by calling commit with an completion handler instead and waiting for the operation to complete before continuing.
However the passed handler is never called.
After adding a timeout timer that completes the future in case the commit completion handler is not called in time, we see the following exception right after the timer fires and completes the future:
(the timeout seems to not matter at all, as we've tried with values from 500ms up to 30s and the result is the same)
The text was updated successfully, but these errors were encountered: