Skip to content

Commit

Permalink
[greyhound] fix OffsetsInitializer metadata bug (#35684)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: d9758275c502278ed9fd845b3781cd92b7152f8a
  • Loading branch information
ben-wattelman authored and wix-oss committed Sep 8, 2023
1 parent f765cb3 commit b9180d0
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.wixpress.dst.greyhound.core.{Offset, OffsetAndMetadata, TopicPartitio
import zio._

import java.util.Base64
import scala.util.Try

trait OffsetsAndGaps {
def init(committedOffsets: Map[TopicPartition, OffsetAndGaps]): UIO[Unit]
Expand Down Expand Up @@ -111,11 +112,9 @@ object OffsetsAndGaps {

def parseGapsString(rawOffsetAndGapsString: String): Option[OffsetAndGaps] = {
val offsetAndGapsString =
if (rawOffsetAndGapsString.nonEmpty)
new String(
GzipCompression.decompress(Base64.getDecoder.decode(rawOffsetAndGapsString)).getOrElse(Array.empty)
)
else ""
if (rawOffsetAndGapsString.nonEmpty) {
Try(new String(GzipCompression.decompress(Base64.getDecoder.decode(rawOffsetAndGapsString)).getOrElse(Array.empty))).getOrElse("")
} else ""
val lastHandledOffsetSeparatorIndex = offsetAndGapsString.indexOf(LAST_HANDLED_OFFSET_SEPARATOR)
if (lastHandledOffsetSeparatorIndex < 0)
None
Expand All @@ -132,13 +131,19 @@ object OffsetsAndGaps {
}
}

def firstGapOffset(gapsString: String): Option[Offset] = {
private def firstGapOffset(gapsString: String): Option[Offset] = {
val maybeOffsetAndGaps = parseGapsString(gapsString)
maybeOffsetAndGaps match {
case Some(offsetAndGaps) if offsetAndGaps.gaps.nonEmpty => Some(offsetAndGaps.gaps.minBy(_.start).start)
case _ => None
}
}

def gapsSmallestOffsets(offsets: Map[TopicPartition, Option[OffsetAndMetadata]]): Map[TopicPartition, OffsetAndMetadata] =
offsets
.collect { case (tp, Some(om)) => tp -> om }
.map(tpom => tpom._1 -> (firstGapOffset(tpom._2.metadata), tpom._2.metadata))
.collect { case (tp, (Some(offset), metadata)) => tp -> OffsetAndMetadata(offset, metadata) }
}

case class Gap(start: Offset, end: Offset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.wixpress.dst.greyhound.core.consumer

import java.time.Clock
import com.wixpress.dst.greyhound.core.consumer.ConsumerMetric.{CommittedMissingOffsets, CommittedMissingOffsetsFailed, SkippedGapsOnInitialization}
import com.wixpress.dst.greyhound.core.consumer.OffsetsAndGaps.{firstGapOffset, parseGapsString}
import com.wixpress.dst.greyhound.core.{ClientId, Group, Offset, OffsetAndMetadata, TopicPartition}
import com.wixpress.dst.greyhound.core.metrics.{GreyhoundMetric, GreyhoundMetrics}
import zio.{URIO, ZIO}
Expand Down Expand Up @@ -82,10 +81,7 @@ class OffsetsInitializer(
val seekToEndPartitions = seekTo.collect { case (k, SeekTo.SeekToEnd) => k }.toSet
val toPause = seekTo.collect { case (k, SeekTo.Pause) => k }
val seekToEndOffsets = fetchEndOffsets(seekToEndPartitions, timeout).mapValues(OffsetAndMetadata.apply)
val gapsSmallestOffsets = currentCommittedOffsets
.collect { case (tp, Some(om)) => tp -> om }
.map(tpom => tpom._1 -> (firstGapOffset(tpom._2.metadata), tpom._2.metadata))
.collect { case (tp, (Some(offset), metadata)) => tp -> OffsetAndMetadata(offset, metadata) }
val gapsSmallestOffsets = OffsetsAndGaps.gapsSmallestOffsets(currentCommittedOffsets)
val seekToGapsOffsets = if (parallelConsumer) gapsSmallestOffsets else Map.empty
val toOffsets = seekToOffsets ++ seekToEndOffsets ++ seekToGapsOffsets

Expand All @@ -96,7 +92,7 @@ class OffsetsInitializer(
private def reportSkippedGaps(currentCommittedOffsets: Map[TopicPartition, Option[OffsetAndMetadata]]) = {
val skippedGaps = currentCommittedOffsets
.collect { case (tp, Some(om)) => tp -> om }
.map(tpom => tpom._1 -> parseGapsString(tpom._2.metadata))
.map(tpom => tpom._1 -> OffsetsAndGaps.parseGapsString(tpom._2.metadata))
.collect { case (tp, Some(gaps)) => tp -> gaps }
reporter(SkippedGapsOnInitialization(clientId, group, skippedGaps))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,24 @@ class OffsetsInitializerTest extends SpecificationWithJUnit with Mockito {
)
}

"commit offsets even when unknown metadata string is present in committed offsets" in
new ctx() {
givenCommittedOffsetsAndMetadata(partitions)(Map(p1 -> OffsetAndMetadata(randomInt, randomStr)))
givenPositions(p2 -> p2Pos, p3 -> p3Pos)

committer.initializeOffsets(partitions)

val missingOffsets = Map(
p2 -> p2Pos,
p3 -> p3Pos
)
there was
one(offsetOps).commitWithMetadata(
missingOffsets.mapValues(OffsetAndMetadata(_)),
timeout
)
}

class ctx(val seekTo: Map[TopicPartition, SeekTo] = Map.empty, offsetReset: OffsetReset = OffsetReset.Latest) extends Scope {
private val metricsLogRef = new AtomicReference(Seq.empty[GreyhoundMetric])
def reported = metricsLogRef.get
Expand Down Expand Up @@ -226,6 +244,12 @@ class OffsetsInitializerTest extends SpecificationWithJUnit with Mockito {
offsetOps.committedWithMetadata(partitions, timeout) returns result.mapValues(OffsetAndMetadata(_))
}

def givenCommittedOffsetsAndMetadata(partitions: Set[TopicPartition], timeout: zio.Duration = timeout)(
result: Map[TopicPartition, OffsetAndMetadata]
) = {
offsetOps.committedWithMetadata(partitions, timeout) returns result
}

def givenEndOffsets(partitions: Set[TopicPartition], timeout: zio.Duration = timeout)(result: Map[TopicPartition, Long]) = {
offsetOps.endOffsets(partitions, timeout) returns result
}
Expand Down

0 comments on commit b9180d0

Please sign in to comment.