Skip to content

Commit

Permalink
[greyhound] parallel consumer - fix skip logic (#36281)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 8fe84370c72e4bf864708f42ed0e6fe435cddc92
  • Loading branch information
ben-wattelman authored and wix-oss committed Sep 8, 2023
1 parent 802276c commit 36c7c8e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ object Dispatcher {
private def shouldRecordBeHandled(record: Record, gaps: Map[TopicPartition, OffsetAndGaps]): Boolean = {
gaps.get(TopicPartition(record.topic, record.partition)) match {
case Some(offsetAndGapsForPartition) if offsetAndGapsForPartition.gaps.nonEmpty =>
record.offset > offsetAndGapsForPartition.offset || offsetAndGapsForPartition.gaps.exists(_.contains(record.offset))
record.offset >= offsetAndGapsForPartition.offset || offsetAndGapsForPartition.gaps.exists(_.contains(record.offset))
case _ => true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.wixpress.dst.greyhound.core.consumer.DispatcherMetric.RecordHandled
import com.wixpress.dst.greyhound.core.consumer.RecordConsumer.Env
import com.wixpress.dst.greyhound.core.consumer.SubmitResult.Rejected
import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerRecord
import com.wixpress.dst.greyhound.core.consumer.{Dispatcher, SubmitResult}
import com.wixpress.dst.greyhound.core.consumer.{Dispatcher, Gap, OffsetAndGaps, SubmitResult}
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetric
import com.wixpress.dst.greyhound.core.testkit._
import com.wixpress.dst.greyhound.core.zioutils.AwaitShutdown.ShutdownPromise
Expand Down Expand Up @@ -108,6 +108,36 @@ class DispatcherTest extends BaseTest[TestMetrics with TestClock] {
} yield ok) // if execution is not parallel, the latch will not be released
}

"consume records with parallel consumer when prior committed offset and gaps exist" in
new ctx {
val recordOffset2 = ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 2L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L, "")
val recordOffset3 = ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 3L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L, "")

val existingOffsetAndGap = Map(
TopicPartition(topic, partition) -> OffsetAndGaps(2L, Seq(Gap(0, 0)))
) // simulate following situation: only offset 1 was consumed, so 0 is a gap

run(for {
handled <- Ref.make[Int](0)
ref <- Ref.make[Map[TopicPartition, ShutdownPromise]](Map.empty)
init <- getInit

dispatcher <- Dispatcher.make(
"group",
"clientId",
_ => handled.update(_ + 1),
lowWatermark,
highWatermark,
workersShutdownRef = ref,
consumeInParallel = true,
currentGaps = _ => ZIO.succeed(existingOffsetAndGap),
init = init
)
_ <- submitBatch(dispatcher, Seq(recordOffset2, recordOffset3))
numHandled <- handled.get
} yield numHandled must equalTo(2))
}

"reject records when high watermark is reached" in
new ctx() {
run(for {
Expand Down

0 comments on commit 36c7c8e

Please sign in to comment.