Skip to content

Commit

Permalink
Allow stream to be interrupted when there is no traffic
Browse files Browse the repository at this point in the history
When a partition is lost while there is no traffic, `PartitionStreamControl` is blocked waiting for data. We fix this by racing with the `interruptPromise`. (Thanks @josdirksen for the analysis!)

Note: this situation can only occur with lost partitions. Timeouts (the other reason for interrupts) do not occur when there is no traffic. Currently, we have no way to test lost partitions. Therefore, there are no added tests.

See #1250.
  • Loading branch information
erikvanoosten committed Jun 7, 2024
1 parent b7c5894 commit 1b16bbd
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,11 @@ object PartitionStreamControl {
queueInfo <- Ref.make(QueueInfo(now, 0, None, 0))
requestAndAwaitData =
for {
_ <- commandQueue.offer(RunloopCommand.Request(tp))
_ <- diagnostics.emit(DiagnosticEvent.Request(tp))
taken <- dataQueue.takeBetween(1, Int.MaxValue)
_ <- commandQueue.offer(RunloopCommand.Request(tp))
_ <- diagnostics.emit(DiagnosticEvent.Request(tp))
taken <- dataQueue
.takeBetween(1, Int.MaxValue)
.race(interruptionPromise.await.as(Chunk.empty))
} yield taken

stream = ZStream.logAnnotate(
Expand Down

0 comments on commit 1b16bbd

Please sign in to comment.