-
Notifications
You must be signed in to change notification settings - Fork 139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Partition lost not recovering, possible issue with RunLoop
#1250
Comments
RunLoop
Thanks @josdirksen , that is some awesome spelunking there. I think you have found all the right places in the code, and also your analysis seems correct. We can fix requestAndAwaitData =
for {
_ <- commandQueue.offer(RunloopCommand.Request(tp))
_ <- diagnostics.emit(DiagnosticEvent.Request(tp))
taken <- dataQueue
.takeBetween(1, Int.MaxValue)
.race(interruptionPromise.await) // <-- added race here
} yield taken should do it. However, I am beginning to wonder if we should fail the stream like this at all! It seems that lost partitions are more common then we thought. I was always working under the assumption that lost partitions are 'end of the world' type of situations, e.g. network splits, and network lost for a log time, where any processing that is still going on should be aborted ASAP. Perhaps we should return to the situation we had before, where we treated a lost partition the same as a revoked partition. OR, we could treat it as a revoked partition when the internal queues are empty anyway... 🤔 |
When a partition is lost while there is no traffic, `PartitionStreamControl` is blocked waiting for data. We fix this by racing with the `interruptPromise`. (Thanks @josdirksen for the analysis!) Note: this situation can only occur with lost partitions. Timeouts (the other reason for interrupts) do not occur when there is no traffic. Currently, we have no way to test lost partitions. Therefore, there are no added tests. See #1250.
When a partition is lost while there is no traffic, `PartitionStreamControl` is blocked waiting for data. We fix this by racing with the `interruptPromise`. (Thanks @josdirksen for the analysis! See #1250.) Note: this situation can only occur with lost partitions. Timeouts (the other reason for interrupts) do not occur when there is no traffic. Currently, we have no way to test lost partitions. Therefore, there are no added tests. This PR does _not_ change how lost partitions are handled. That is, the stream for the partition that is lost is interrupted, the other streams are closed gracefully, the consumer aborts with an error.
Before 2.7.0 a lost partition was treated as a revoked partition. Since the partition is already assigned to another node, this potentially leads to duplicate processing of records. Zio-kafka 2.7.0 assumes that a lost partition is a fatal event. It leads to an interrupt in the stream that handles the partition. The other streams are ended, and the consumer closes with an error. Usually, a full program restart is needed to resume consuming. It should be noted that stream processing is not interrupted immediately. Only when the stream requests new records, the interrupt is observed. Unfortunately, we have not found a clean way to interrupt the stream consumer directly. Meanwhile, from bug reports, we understand that partitions are usually lost when no records have been received for a long time. In conclusion, 1) it is not possible to immediately interrupt user stream processing, and 2) it most likely not needed anyway because the stream is awaiting new records. With this change, a lost partition no longer leads to an interrupt. Instead, we first drain the stream's internal queue (just to be sure, it is probably already empty), and then we end it gracefully (that is, without error). Other streams are not affected, the consumer will continue to work. When `rebalanceSafeCommits` is enabled, lost partitions do _not_ participate like revoked partitions do. So lost partitions cannot hold up a rebalance. Fixes #1233 and #1250.
Before 2.7.0 a lost partition was treated as a revoked partition. Since the partition is already assigned to another node, this potentially leads to duplicate processing of records. Zio-kafka 2.7.0 assumes that a lost partition is a fatal event. It leads to an interrupt in the stream that handles the partition. The other streams are ended, and the consumer closes with an error. Usually, a full program restart is needed to resume consuming. It should be noted that stream processing is not interrupted immediately. Only when the stream requests new records, the interrupt is observed. Unfortunately, we have not found a clean way to interrupt the stream consumer directly. Meanwhile, from bug reports (#1233, #1250), we understand that partitions are usually lost when no records have been received for a long time. In conclusion, 1) it is not possible to immediately interrupt user stream processing, and 2) it is most likely not needed anyway because the stream is already done processing and awaiting more records. With this change, a lost partition no longer leads to an interrupt. Instead, we first drain the stream's internal queue (just to be sure, it is probably already empty), and then we end the stream gracefully (that is, without error, like we do with revoked partitions). Other streams are not affected, the consumer will continue to work. Lost partitions do not affect the features `rebalanceSafeCommits` and `restartStreamsOnRebalancing`; they do _not_ hold up a rebalance waiting for commits to complete, and they do _not_ lead to restarts of other streams. Clients that want to handle the partition lost event somehow, instead of handling the failed stream they need to create their own `RebalanceListener` and handle the `onLost` call. Fixes #1233 and #1250.
Fixes #1288. See also #1233 and #1250. When all partitions are lost after some connection issue to the broker, the streams for lost partitions are ended but polling stops, due to the conditions in `Runloop.State#shouldPoll`. This PR fixes this by removing the lost partition streams from the `assignedStreams` in the state, thereby not disabling polling. Also adds a warning that is logged whenever the assigned partitions (according to the apache kafka consumer) are different from the assigned streams, which helps to identify other issues or any future regressions of this issue. ~Still needs a good test, the `MockConsumer` used in other tests unfortunately does not allow simulating lost partitions, and the exact behavior of the kafka client in this situation is hard to predict..~ Includes a test that fails when undoing the change to Runloop
@josdirksen Version 2.8.3 no longer fails onlost partitions and properly continues polling. I am going to close this issue. Please re-open or create a new one one if you still have problems. |
This might be related to the #1233 issue, but the last couple of weeks / months we see issues where after a partition is lost, it isn't recovering correctly. We've tried to analyze or debug it, but this occurs so infrequently that we haven't been able to isolate it.
By analyzing the code we might have identified the reason, but there is so much async stuff happening there, that we might be interpreting stuff wrongly.
What happens in our case is the following:
For partitions that are revoked everything seems to be working correctly though.
What we see as possible cause for this is this. In the
Runloop
this happens for lost partitions:Resulting in this call in the
PartitionStreamControl
:Looking at the way the
interruptionPromise
is handled this doesn't seem to work correctly when there are no records to be processed. InPartitionStreamControl
we've got this repeating effect:And here the
interruptionPromise
is checked to see if we need to interrupt this effect. But, how would this work if there are no active chunks to process? TherequestAndAawaitData
function:Blocks the current fiber until at least 1 element is taken. So when the
lost
function fails the promise, that promise is never checked, since there are no records coming in on thedataQueue
(or I'm reading stuff wrong here, which is of course also possible).For the revoke flow, the
dataQueue
gets an additionalTake.end), to get out of the
requestAndAwaitDatawait state. But that doesn't happen for the
lost` scenario.So, shouldn't the code for lost also make sure the dataQueue at least gets some value, since it seems to be stuck in the
requestAndAwaitData
loop indefinitely.The text was updated successfully, but these errors were encountered: