Skip to content

Commit

Permalink
Remove lost partitions from assigned streams (#1350)
Browse files Browse the repository at this point in the history
Fixes #1288. See also #1233 and #1250.

When all partitions are lost after some connection issue to the broker,
the streams for lost partitions are ended but polling stops, due to the
conditions in `Runloop.State#shouldPoll`. This PR fixes this by removing
the lost partition streams from the `assignedStreams` in the state,
thereby not disabling polling.

Also adds a warning that is logged whenever the assigned partitions
(according to the apache kafka consumer) are different from the assigned
streams, which helps to identify other issues or any future regressions
of this issue.

~Still needs a good test, the `MockConsumer` used in other tests
unfortunately does not allow simulating lost partitions, and the exact
behavior of the kafka client in this situation is hard to predict..~
Includes a test that fails when undoing the change to Runloop
  • Loading branch information
svroonland authored Oct 29, 2024
1 parent aa3bc7c commit 5e4b287
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
package zio.kafka.consumer.internal

import org.apache.kafka.clients.consumer.{ ConsumerRecord, MockConsumer, OffsetResetStrategy }
import org.apache.kafka.clients.consumer.{
ConsumerRebalanceListener,
ConsumerRecord,
MockConsumer,
OffsetResetStrategy
}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException }
import zio._
import zio.kafka.consumer.{ ConsumerSettings, Subscription }
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
import zio.kafka.consumer.{ ConsumerSettings, Subscription }
import zio.metrics.{ MetricState, Metrics }
import zio.stream.{ Take, ZStream }
import zio.test.TestAspect.withLiveClock
import zio.test._

import java.util
import scala.jdk.CollectionConverters._

object RunloopSpec extends ZIOSpecDefault {
object RunloopSpec extends ZIOSpecDefaultSlf4j {

private type BinaryMockConsumer = MockConsumer[Array[Byte], Array[Byte]]
private type PartitionsHub = Hub[Take[Throwable, PartitionAssignment]]
Expand Down Expand Up @@ -93,6 +100,55 @@ object RunloopSpec extends ZIOSpecDefault {
}
}
},
test(
"runloop continues polling after a lost partition"
) {
Diagnostics.SlidingQueue.make(100).flatMap { diagnostics =>
var rebalanceListener: ConsumerRebalanceListener = null

// Catches the rebalance listener so we can use it
val mockConsumer: BinaryMockConsumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) {
override def subscribe(
topics: util.Collection[String],
listener: ConsumerRebalanceListener
): Unit = {
rebalanceListener = listener
super.subscribe(topics, listener)
}
}

withRunloop(diagnostics, mockConsumer) { (mockConsumer, partitionsHub, runloop) =>
mockConsumer.schedulePollTask { () =>
mockConsumer.updateEndOffsets(Map(tp10 -> Long.box(0L), tp11 -> Long.box(0L)).asJava)
mockConsumer.rebalance(Seq(tp10, tp11).asJava)
}
mockConsumer.schedulePollTask { () =>
rebalanceListener.onPartitionsLost(Seq(tp10, tp11).asJava)
}
mockConsumer.schedulePollTask { () =>
mockConsumer.rebalance(Seq.empty.asJava)
mockConsumer.rebalance(Seq(tp10, tp11).asJava)
}

for {
streamStream <- ZStream.fromHubScoped(partitionsHub)
_ <- runloop.addSubscription(Subscription.Topics(Set(tp10, tp11).map(_.topic())))
result <- streamStream
.map(_.exit)
.flattenExitOption
.flattenChunks
.take(3)
.mapZIO { case (_, stream) =>
stream.runHead
}
.runDrain
.timeout(10.seconds)
} yield assertTrue(
result.isDefined
) // Test will not finish if polling did not continue after partitions lost
}
}
},
test("runloop retries poll upon AuthorizationException and AuthenticationException") {
withRunloop() { (mockConsumer, partitionsHub, runloop) =>
mockConsumer.schedulePollTask { () =>
Expand Down Expand Up @@ -131,11 +187,13 @@ object RunloopSpec extends ZIOSpecDefault {
}
) @@ withLiveClock

private def withRunloop(diagnostics: Diagnostics = Diagnostics.NoOp)(
private def withRunloop(
diagnostics: Diagnostics = Diagnostics.NoOp,
mockConsumer: BinaryMockConsumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST)
)(
f: (BinaryMockConsumer, PartitionsHub, Runloop) => ZIO[Scope, Throwable, TestResult]
): ZIO[Scope, Throwable, TestResult] =
ZIO.scoped {
val mockConsumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST)
for {
consumerAccess <- ConsumerAccess.make(mockConsumer)
consumerScope <- ZIO.scope
Expand Down
18 changes: 15 additions & 3 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ private[consumer] final class Runloop private (
state <- currentStateRef.get
lostStreams = state.assignedStreams.filter(control => lostTps.contains(control.tp))
_ <- ZIO.foreachDiscard(lostStreams)(_.lost)
_ <- lastRebalanceEvent.set(rebalanceEvent.onLost(lostTps))
_ <- lastRebalanceEvent.set(rebalanceEvent.onLost(lostTps, lostStreams))
_ <- ZIO.logTrace(s"onLost done")
} yield ()
)
Expand Down Expand Up @@ -560,6 +560,17 @@ private[consumer] final class Runloop private (
ended = endedStreams.map(_.tp).toSet
)
)
// Ensure that all assigned partitions have a stream and no streams are present for unassigned streams
_ <-
ZIO
.logWarning(
s"Not all assigned partitions have a (single) stream or vice versa. Assigned: ${currentAssigned.mkString(",")}, streams: ${updatedAssignedStreams.map(_.tp).mkString(",")}"
)
.when(
currentAssigned != updatedAssignedStreams
.map(_.tp)
.toSet || currentAssigned.size != updatedAssignedStreams.size
)
} yield Runloop.PollResult(
records = polledRecords,
ignoreRecordsForTps = ignoreRecordsForTps,
Expand Down Expand Up @@ -830,11 +841,12 @@ object Runloop {
endedStreams = this.endedStreams ++ endedStreams
)

def onLost(lost: Set[TopicPartition]): RebalanceEvent =
def onLost(lost: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent =
copy(
wasInvoked = true,
assignedTps = assignedTps -- lost,
lostTps = lostTps ++ lost
lostTps = lostTps ++ lost,
endedStreams = this.endedStreams ++ endedStreams
)
}

Expand Down

0 comments on commit 5e4b287

Please sign in to comment.