-
Notifications
You must be signed in to change notification settings - Fork 37
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[greyhound] parallel consumer OffsetsAndGaps (#33605)
GitOrigin-RevId: f178c94663c7cbcb22bd7266c3e15919d8997d8c
- Loading branch information
1 parent
59ed17c
commit 221ee80
Showing
2 changed files
with
154 additions
and
0 deletions.
There are no files selected for viewing
92 changes: 92 additions & 0 deletions
92
core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGaps.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
package com.wixpress.dst.greyhound.core.consumer | ||
|
||
import com.wixpress.dst.greyhound.core.{Offset, TopicPartition} | ||
import zio._ | ||
|
||
trait OffsetsAndGaps { | ||
def getCommittableAndClear: UIO[Map[TopicPartition, OffsetAndGaps]] | ||
|
||
def gapsForPartition(partition: TopicPartition): UIO[Seq[Gap]] | ||
|
||
def update(partition: TopicPartition, batch: Seq[Offset]): UIO[Unit] | ||
|
||
def contains(partition: TopicPartition, offset: Offset): UIO[Boolean] | ||
} | ||
|
||
object OffsetsAndGaps { | ||
def make: UIO[OffsetsAndGaps] = | ||
Ref.make(Map.empty[TopicPartition, OffsetAndGaps]).map { ref => | ||
new OffsetsAndGaps { | ||
override def getCommittableAndClear: UIO[Map[TopicPartition, OffsetAndGaps]] = | ||
ref.modify(offsetsAndGaps => { | ||
val committable = offsetsAndGaps.filter(_._2.committable) | ||
val updated = offsetsAndGaps.mapValues(_.markCommitted) | ||
(committable, updated) | ||
}) | ||
|
||
override def gapsForPartition(partition: TopicPartition): UIO[Seq[Gap]] = | ||
ref.get.map(_.get(partition).fold(Seq.empty[Gap])(_.gaps.sortBy(_.start))) | ||
|
||
override def update(partition: TopicPartition, batch: Seq[Offset]): UIO[Unit] = | ||
ref.update { offsetsAndGaps => | ||
val sortedBatch = batch.sorted | ||
val maxBatchOffset = sortedBatch.last | ||
val maybeOffsetAndGaps = offsetsAndGaps.get(partition) | ||
val prevOffset = maybeOffsetAndGaps.fold(-1L)(_.offset) | ||
val partitionOffsetAndGaps = maybeOffsetAndGaps.fold(OffsetAndGaps(maxBatchOffset))(identity) | ||
|
||
val newGaps = gapsInBatch(sortedBatch, prevOffset) | ||
|
||
val updatedGaps = updateGapsByOffsets( | ||
partitionOffsetAndGaps.gaps ++ newGaps, | ||
sortedBatch | ||
) | ||
|
||
offsetsAndGaps + (partition -> OffsetAndGaps(maxBatchOffset max prevOffset, updatedGaps)) | ||
}.unit | ||
|
||
override def contains(partition: TopicPartition, offset: Offset): UIO[Boolean] = | ||
ref.get.map(_.get(partition).fold(false)(_.contains(offset))) | ||
|
||
private def gapsInBatch(batch: Seq[Offset], prevLastOffset: Offset): Seq[Gap] = | ||
batch.sorted | ||
.foldLeft(Seq.empty[Gap], prevLastOffset) { | ||
case ((gaps, lastOffset), offset) => | ||
if (offset <= lastOffset) (gaps, lastOffset) | ||
else if (offset == lastOffset + 1) (gaps, offset) | ||
else { | ||
val newGap = Gap(lastOffset + 1, offset - 1) | ||
(newGap +: gaps, offset) | ||
} | ||
} | ||
._1 | ||
.reverse | ||
|
||
private def updateGapsByOffsets(gaps: Seq[Gap], offsets: Seq[Offset]): Seq[Gap] = { | ||
val gapsToOffsets = gaps.map(gap => gap -> offsets.filter(o => o >= gap.start && o <= gap.end)).toMap | ||
gapsToOffsets.flatMap { | ||
case (gap, offsets) => | ||
if (offsets.isEmpty) Seq(gap) | ||
else if (offsets.size == (gap.size)) Seq.empty[Gap] | ||
else gapsInBatch(offsets ++ Seq(gap.start - 1, gap.end + 1), gap.start - 2) | ||
}.toSeq | ||
} | ||
} | ||
} | ||
} | ||
|
||
case class Gap(start: Offset, end: Offset) { | ||
def contains(offset: Offset): Boolean = start <= offset && offset <= end | ||
|
||
def size: Long = end - start + 1 | ||
} | ||
|
||
case class OffsetAndGaps(offset: Offset, gaps: Seq[Gap], committable: Boolean = true) { | ||
def contains(offset: Offset): Boolean = gaps.exists(_.contains(offset)) | ||
|
||
def markCommitted: OffsetAndGaps = copy(committable = false) | ||
} | ||
|
||
object OffsetAndGaps { | ||
def apply(offset: Offset): OffsetAndGaps = OffsetAndGaps(offset, Seq.empty[Gap]) | ||
} |
62 changes: 62 additions & 0 deletions
62
core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGapsTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
package com.wixpress.dst.greyhound.core.consumer | ||
|
||
import com.wixpress.dst.greyhound.core.TopicPartition | ||
import com.wixpress.dst.greyhound.core.consumer.OffsetGapsTest._ | ||
import com.wixpress.dst.greyhound.core.testkit.BaseTestNoEnv | ||
|
||
class OffsetsAndGapsTestGapsTest extends BaseTestNoEnv { | ||
|
||
"calculate gaps created by handled batch" in { | ||
for { | ||
offsetGaps <- OffsetsAndGaps.make | ||
_ <- offsetGaps.update(topicPartition, Seq(1L, 3L, 7L)) | ||
currentGaps <- offsetGaps.gapsForPartition(topicPartition) | ||
} yield currentGaps must beEqualTo(Seq(Gap(0L, 0L), Gap(2L, 2L), Gap(4L, 6L))) | ||
} | ||
|
||
"update offset and gaps according to handled batch" in { | ||
for { | ||
offsetGaps <- OffsetsAndGaps.make | ||
_ <- offsetGaps.update(topicPartition, Seq(1L, 3L, 7L)) | ||
_ <- offsetGaps.update(topicPartition, Seq(2L, 5L)) | ||
getCommittableAndClear <- offsetGaps.getCommittableAndClear | ||
} yield getCommittableAndClear must havePair(topicPartition -> OffsetAndGaps(7L, Seq(Gap(0L, 0L), Gap(4L, 4L), Gap(6L, 6L)))) | ||
} | ||
|
||
"clear committable offsets" in { | ||
for { | ||
offsetGaps <- OffsetsAndGaps.make | ||
_ <- offsetGaps.update(topicPartition, Seq(1L, 3L, 7L)) | ||
_ <- offsetGaps.getCommittableAndClear | ||
getCommittableAndClear <- offsetGaps.getCommittableAndClear | ||
} yield getCommittableAndClear must beEmpty | ||
} | ||
|
||
"do not clear gaps on retrieving current" in { | ||
for { | ||
offsetGaps <- OffsetsAndGaps.make | ||
_ <- offsetGaps.update(topicPartition, Seq(1L, 3L, 7L)) | ||
_ <- offsetGaps.gapsForPartition(topicPartition) | ||
currentGaps <- offsetGaps.gapsForPartition(topicPartition) | ||
} yield currentGaps must beEqualTo(Seq(Gap(0L, 0L), Gap(2L, 2L), Gap(4L, 6L))) | ||
} | ||
|
||
"update with larger offset" in { | ||
val partition0 = TopicPartition(topic, 0) | ||
val partition1 = TopicPartition(topic, 1) | ||
|
||
for { | ||
offsetGaps <- OffsetsAndGaps.make | ||
_ <- offsetGaps.update(partition0, Seq(1L)) | ||
_ <- offsetGaps.update(partition0, Seq(0L)) | ||
_ <- offsetGaps.update(partition1, Seq(0L)) | ||
current <- offsetGaps.getCommittableAndClear | ||
} yield current must havePairs(partition0 -> OffsetAndGaps(1L, Seq()), partition1 -> OffsetAndGaps(0L, Seq())) | ||
} | ||
|
||
} | ||
|
||
object OffsetGapsTest { | ||
val topic = "some-topic" | ||
val topicPartition = TopicPartition(topic, 0) | ||
} |