Skip to content

Commit

Permalink
[greyhound] parallel consumer - add OffsetsAndGaps visibility (#36685)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: cfb918350d137d551a9ca88d2e88a207e4b00fc6
  • Loading branch information
ben-wattelman authored and wix-oss committed Sep 8, 2023
1 parent 155a9cc commit e5f040e
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,17 @@ object Dispatcher {
override def waitForCurrentRecordsCompletion: UIO[Unit] =
workers.get.flatMap(workers => ZIO.foreach(workers.values)(_.waitForCurrentExecutionCompletion)).unit

override def revoke(partitions: Set[TopicPartition]): URIO[GreyhoundMetrics, Unit] =
workers
.modify { workers =>
val revoked = workers.filterKeys(partitions.contains)
val remaining = workers -- partitions

(revoked, remaining)
}
.flatMap(shutdownWorkers)
override def revoke(partitions: Set[TopicPartition]): URIO[GreyhoundMetrics, Unit] = {
report(DispatcherRevokingPartitions(clientId, group, partitions, consumerAttributes)) *>
workers
.modify { workers =>
val revoked = workers.filterKeys(partitions.contains)
val remaining = workers -- partitions

(revoked, remaining)
}
.flatMap(shutdownWorkers)
}

override def pause: URIO[GreyhoundMetrics, Unit] = for {
resume <- Promise.make[Nothing, Unit]
Expand Down Expand Up @@ -583,6 +585,13 @@ object DispatcherMetric {
attributes: Map[String, String]
) extends DispatcherMetric

case class DispatcherRevokingPartitions(
clientId: ClientId,
group: Group,
partitions: Set[TopicPartition],
attributes: Map[String, String]
) extends DispatcherMetric

case class ShuttingDownDispatcher(group: Group, clientId: ClientId, attributes: Map[String, String]) extends DispatcherMetric

case class WorkerStopped(group: Group, clientId: ClientId, partition: TopicPartition, durationMs: Long, attributes: Map[String, String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ object EventLoop {
shutdownDispatcherAndReport(group, clientId, consumerAttributes, dispatcher)).disconnect.interruptible
.timeout(config.drainTimeout)
_ <- ZIO.when(drained.isEmpty)(
report(DrainTimeoutExceeded(clientId, group, config.drainTimeout.toMillis, consumerAttributes)) *> fiber.interruptFork
report(DrainTimeoutExceeded(clientId, group, config.drainTimeout.toMillis, onShutdown = true, consumerAttributes)) *>
fiber.interruptFork
)
_ <- if (config.consumePartitionInParallel) commitOffsetsAndGaps(consumer, offsetsAndGaps) else commitOffsets(consumer, offsets)
_ <- report(StoppedEventLoop(clientId, group, consumerAttributes))
Expand Down Expand Up @@ -229,7 +230,15 @@ object EventLoop {
_ <- pausedPartitionsRef.update(_ -- partitions)
isRevokeTimedOut <- dispatcher.revoke(partitions).timeout(config.drainTimeout).map(_.isEmpty)
_ <- ZIO.when(isRevokeTimedOut)(
report(DrainTimeoutExceeded(clientId, group, config.drainTimeout.toMillis, consumer.config.consumerAttributes))
report(
DrainTimeoutExceeded(
clientId,
group,
config.drainTimeout.toMillis,
onShutdown = false,
consumer.config.consumerAttributes
)
)
)
delayedRebalanceEffect <- if (useParallelConsumer) commitOffsetsAndGapsOnRebalance(consumer0, offsetsAndGaps)
else commitOffsetsOnRebalance(consumer0, offsets)
Expand Down Expand Up @@ -361,15 +370,16 @@ object EventLoop {
}

private def commitOffsetsAndGaps(consumer: Consumer, offsetsAndGaps: OffsetsAndGaps): URIO[GreyhoundMetrics, Unit] = {
offsetsAndGaps.getCommittableAndClear.flatMap { committable =>
val offsetsAndMetadataToCommit = OffsetsAndGaps.toOffsetsAndMetadata(committable)
report(CommittingOffsetsAndGaps(consumer.config.groupId, committable)) *>
consumer
.commitWithMetadata(offsetsAndMetadataToCommit)
.tap(_ => ZIO.when(offsetsAndMetadataToCommit.nonEmpty)(report(CommittedOffsetsAndGaps(committable))))
.catchAll { t =>
report(FailedToCommitOffsetsAndMetadata(t, offsetsAndMetadataToCommit)) *> offsetsAndGaps.setCommittable(committable)
}
offsetsAndGaps.getCommittableAndClear.flatMap {
case (committable, offsetsAndGapsBefore, offsetsAndGapsAfter) =>
val offsetsAndMetadataToCommit = OffsetsAndGaps.toOffsetsAndMetadata(committable)
report(CommittingOffsetsAndGaps(consumer.config.groupId, committable, offsetsAndGapsBefore, offsetsAndGapsAfter)) *>
consumer
.commitWithMetadata(offsetsAndMetadataToCommit)
.tap(_ => ZIO.when(offsetsAndMetadataToCommit.nonEmpty)(report(CommittedOffsetsAndGaps(committable))))
.catchAll { t =>
report(FailedToCommitOffsetsAndMetadata(t, offsetsAndMetadataToCommit)) *> offsetsAndGaps.setCommittable(committable)
}
}
}

Expand Down Expand Up @@ -397,11 +407,13 @@ object EventLoop {
offsetsAndGaps: OffsetsAndGaps
): URIO[GreyhoundMetrics, DelayedRebalanceEffect] = {
for {
committable <- offsetsAndGaps.getCommittableAndClear
tle <- consumer
.commitWithMetadataOnRebalance(OffsetsAndGaps.toOffsetsAndMetadata(committable))
.catchAll { _ => offsetsAndGaps.setCommittable(committable) *> DelayedRebalanceEffect.zioUnit }
runtime <- ZIO.runtime[Any]
committableResult <- offsetsAndGaps.getCommittableAndClear
(committable, offsetsAndGapsBefore, offsetsAndGapsAfter) = committableResult
_ <- report(CommittingOffsetsAndGaps(consumer.config.groupId, committable, offsetsAndGapsBefore, offsetsAndGapsAfter))
tle <- consumer
.commitWithMetadataOnRebalance(OffsetsAndGaps.toOffsetsAndMetadata(committable))
.catchAll { _ => offsetsAndGaps.setCommittable(committable) *> DelayedRebalanceEffect.zioUnit }
runtime <- ZIO.runtime[Any]
} yield tle.catchAll { _ =>
zio.Unsafe.unsafe { implicit s =>
runtime.unsafe
Expand Down Expand Up @@ -481,6 +493,8 @@ object EventLoopMetric {
case class CommittingOffsetsAndGaps(
groupId: Group,
offsetsAndGaps: Map[TopicPartition, OffsetAndGaps],
offsetsAndGapsBefore: Map[TopicPartition, OffsetAndGaps],
offsetsAndGapsAfter: Map[TopicPartition, OffsetAndGaps],
attributes: Map[String, String] = Map.empty
) extends EventLoopMetric

Expand All @@ -493,8 +507,13 @@ object EventLoopMetric {

case class DispatcherStopped(group: Group, clientId: ClientId, durationMs: Long, attributes: Map[String, String]) extends EventLoopMetric

case class DrainTimeoutExceeded(clientId: ClientId, group: Group, timeoutMs: Long, attributes: Map[String, String] = Map.empty)
extends EventLoopMetric
case class DrainTimeoutExceeded(
clientId: ClientId,
group: Group,
timeoutMs: Long,
onShutdown: Boolean,
attributes: Map[String, String] = Map.empty
) extends EventLoopMetric

case class HighWatermarkReached(partition: TopicPartition, onOffset: Offset, attributes: Map[String, String] = Map.empty)
extends EventLoopMetric
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import scala.util.Try
trait OffsetsAndGaps {
def init(committedOffsets: Map[TopicPartition, OffsetAndGaps]): UIO[Unit]

def getCommittableAndClear: UIO[Map[TopicPartition, OffsetAndGaps]]
def getCommittableAndClear
: UIO[(Map[TopicPartition, OffsetAndGaps], Map[TopicPartition, OffsetAndGaps], Map[TopicPartition, OffsetAndGaps])]

def gapsForPartition(partition: TopicPartition): UIO[Seq[Gap]]

Expand Down Expand Up @@ -44,11 +45,12 @@ object OffsetsAndGaps {
override def init(committedOffsets: Map[TopicPartition, OffsetAndGaps]): UIO[Unit] =
ref.update(_ => committedOffsets)

override def getCommittableAndClear: UIO[Map[TopicPartition, OffsetAndGaps]] =
override def getCommittableAndClear
: UIO[(Map[TopicPartition, OffsetAndGaps], Map[TopicPartition, OffsetAndGaps], Map[TopicPartition, OffsetAndGaps])] =
ref.modify(offsetsAndGaps => {
val committable = offsetsAndGaps.filter(_._2.committable)
val updated = offsetsAndGaps.mapValues(_.markCommitted)
(committable, updated)
val updated = offsetsAndGaps.map { case (tp, og) => tp -> og.markCommitted }
((committable, offsetsAndGaps, updated), updated)
})

override def gapsForPartition(partition: TopicPartition): UIO[Seq[Gap]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class OffsetsAndGapsTestGapsTest extends BaseTestNoEnv {
offsetGaps <- OffsetsAndGaps.make
_ <- offsetGaps.update(topicPartition, Seq(1L, 3L, 7L))
_ <- offsetGaps.update(topicPartition, Seq(2L, 5L))
getCommittableAndClear <- offsetGaps.getCommittableAndClear
getCommittableAndClear <- offsetGaps.getCommittableAndClear.map(_._1)
} yield getCommittableAndClear must havePair(topicPartition -> OffsetAndGaps(7L, Seq(Gap(0L, 0L), Gap(4L, 4L), Gap(6L, 6L))))
}

Expand All @@ -30,7 +30,7 @@ class OffsetsAndGapsTestGapsTest extends BaseTestNoEnv {
offsetGaps <- OffsetsAndGaps.make
_ <- offsetGaps.update(topicPartition, Seq(1L, 3L, 7L))
_ <- offsetGaps.getCommittableAndClear
getCommittableAndClear <- offsetGaps.getCommittableAndClear
getCommittableAndClear <- offsetGaps.getCommittableAndClear.map(_._1)
} yield getCommittableAndClear must beEmpty
}

Expand All @@ -52,7 +52,7 @@ class OffsetsAndGapsTestGapsTest extends BaseTestNoEnv {
_ <- offsetGaps.update(partition0, Seq(1L))
_ <- offsetGaps.update(partition0, Seq(0L))
_ <- offsetGaps.update(partition1, Seq(0L))
current <- offsetGaps.getCommittableAndClear
current <- offsetGaps.getCommittableAndClear.map(_._1)
} yield current must havePairs(partition0 -> OffsetAndGaps(1L, Seq()), partition1 -> OffsetAndGaps(0L, Seq()))
}

Expand All @@ -66,7 +66,7 @@ class OffsetsAndGapsTestGapsTest extends BaseTestNoEnv {
_ <- offsetGaps.init(initialCommittedOffsets)
_ <- offsetGaps.update(partition0, Seq(101L, 102L))
_ <- offsetGaps.update(partition1, Seq(203L, 204L))
current <- offsetGaps.getCommittableAndClear
current <- offsetGaps.getCommittableAndClear.map(_._1)
} yield current must havePairs(partition0 -> OffsetAndGaps(102L, Seq()), partition1 -> OffsetAndGaps(204L, Seq(Gap(201L, 202L))))
}

Expand Down

0 comments on commit e5f040e

Please sign in to comment.