Skip to content

Commit

Permalink
[greyhound] parallel consumer - remove noisy logs (#36777)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 67e32052558deb48f4373262a724d1ccd1751b82
  • Loading branch information
ben-wattelman authored and wix-oss committed Sep 3, 2023
1 parent cee1cac commit 10ffeb2
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ object EventLoop {
_ <- ZIO.when(records.isEmpty)(ZIO.sleep(50.millis))
} yield true

case ShuttingDown => report(PollOnceFiberShuttingDown(clientId, group, consumer.config.consumerAttributes)) *> ZIO.succeed(false)
case Paused => report(PollOnceFiberPaused(clientId, group, consumer.config.consumerAttributes)) *> ZIO.sleep(100.millis).as(true)
case ShuttingDown => ZIO.succeed(false)
case Paused => ZIO.sleep(100.millis).as(true)
}

private def listener(
Expand Down Expand Up @@ -370,16 +370,14 @@ object EventLoop {
}

private def commitOffsetsAndGaps(consumer: Consumer, offsetsAndGaps: OffsetsAndGaps): URIO[GreyhoundMetrics, Unit] = {
offsetsAndGaps.getCommittableAndClear.flatMap {
case (committable, offsetsAndGapsBefore, offsetsAndGapsAfter) =>
val offsetsAndMetadataToCommit = OffsetsAndGaps.toOffsetsAndMetadata(committable)
report(CommittingOffsetsAndGaps(consumer.config.groupId, committable, offsetsAndGapsBefore, offsetsAndGapsAfter)) *>
consumer
.commitWithMetadata(offsetsAndMetadataToCommit)
.tap(_ => ZIO.when(offsetsAndMetadataToCommit.nonEmpty)(report(CommittedOffsetsAndGaps(committable))))
.catchAll { t =>
report(FailedToCommitOffsetsAndMetadata(t, offsetsAndMetadataToCommit)) *> offsetsAndGaps.setCommittable(committable)
}
offsetsAndGaps.getCommittableAndClear.flatMap { committable =>
val offsetsAndMetadataToCommit = OffsetsAndGaps.toOffsetsAndMetadata(committable)
consumer
.commitWithMetadata(offsetsAndMetadataToCommit)
.tap(_ => ZIO.when(offsetsAndMetadataToCommit.nonEmpty)(report(CommittedOffsetsAndGaps(committable))))
.catchAll { t =>
report(FailedToCommitOffsetsAndMetadata(t, offsetsAndMetadataToCommit)) *> offsetsAndGaps.setCommittable(committable)
}
}
}

Expand Down Expand Up @@ -407,13 +405,11 @@ object EventLoop {
offsetsAndGaps: OffsetsAndGaps
): URIO[GreyhoundMetrics, DelayedRebalanceEffect] = {
for {
committableResult <- offsetsAndGaps.getCommittableAndClear
(committable, offsetsAndGapsBefore, offsetsAndGapsAfter) = committableResult
_ <- report(CommittingOffsetsAndGaps(consumer.config.groupId, committable, offsetsAndGapsBefore, offsetsAndGapsAfter))
tle <- consumer
.commitWithMetadataOnRebalance(OffsetsAndGaps.toOffsetsAndMetadata(committable))
.catchAll { _ => offsetsAndGaps.setCommittable(committable) *> DelayedRebalanceEffect.zioUnit }
runtime <- ZIO.runtime[Any]
committable <- offsetsAndGaps.getCommittableAndClear
tle <- consumer
.commitWithMetadataOnRebalance(OffsetsAndGaps.toOffsetsAndMetadata(committable))
.catchAll { _ => offsetsAndGaps.setCommittable(committable) *> DelayedRebalanceEffect.zioUnit }
runtime <- ZIO.runtime[Any]
} yield tle.catchAll { _ =>
zio.Unsafe.unsafe { implicit s =>
runtime.unsafe
Expand Down Expand Up @@ -493,8 +489,6 @@ object EventLoopMetric {
case class CommittingOffsetsAndGaps(
groupId: Group,
offsetsAndGaps: Map[TopicPartition, OffsetAndGaps],
offsetsAndGapsBefore: Map[TopicPartition, OffsetAndGaps],
offsetsAndGapsAfter: Map[TopicPartition, OffsetAndGaps],
attributes: Map[String, String] = Map.empty
) extends EventLoopMetric

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ import scala.util.Try
trait OffsetsAndGaps {
def init(committedOffsets: Map[TopicPartition, OffsetAndGaps]): UIO[Unit]

def getCommittableAndClear
: UIO[(Map[TopicPartition, OffsetAndGaps], Map[TopicPartition, OffsetAndGaps], Map[TopicPartition, OffsetAndGaps])]
def getCommittableAndClear: UIO[Map[TopicPartition, OffsetAndGaps]]

def gapsForPartition(partition: TopicPartition): UIO[Seq[Gap]]

Expand Down Expand Up @@ -45,12 +44,11 @@ object OffsetsAndGaps {
override def init(committedOffsets: Map[TopicPartition, OffsetAndGaps]): UIO[Unit] =
ref.update(_ => committedOffsets)

override def getCommittableAndClear
: UIO[(Map[TopicPartition, OffsetAndGaps], Map[TopicPartition, OffsetAndGaps], Map[TopicPartition, OffsetAndGaps])] =
override def getCommittableAndClear: UIO[Map[TopicPartition, OffsetAndGaps]] =
ref.modify(offsetsAndGaps => {
val committable = offsetsAndGaps.filter(_._2.committable)
val updated = offsetsAndGaps.map { case (tp, og) => tp -> og.markCommitted }
((committable, offsetsAndGaps, updated), updated)
(committable, updated)
})

override def gapsForPartition(partition: TopicPartition): UIO[Seq[Gap]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class OffsetsAndGapsTestGapsTest extends BaseTestNoEnv {
offsetGaps <- OffsetsAndGaps.make
_ <- offsetGaps.update(topicPartition, Seq(1L, 3L, 7L))
_ <- offsetGaps.update(topicPartition, Seq(2L, 5L))
getCommittableAndClear <- offsetGaps.getCommittableAndClear.map(_._1)
getCommittableAndClear <- offsetGaps.getCommittableAndClear
} yield getCommittableAndClear must havePair(topicPartition -> OffsetAndGaps(7L, Seq(Gap(0L, 0L), Gap(4L, 4L), Gap(6L, 6L))))
}

Expand All @@ -30,7 +30,7 @@ class OffsetsAndGapsTestGapsTest extends BaseTestNoEnv {
offsetGaps <- OffsetsAndGaps.make
_ <- offsetGaps.update(topicPartition, Seq(1L, 3L, 7L))
_ <- offsetGaps.getCommittableAndClear
getCommittableAndClear <- offsetGaps.getCommittableAndClear.map(_._1)
getCommittableAndClear <- offsetGaps.getCommittableAndClear
} yield getCommittableAndClear must beEmpty
}

Expand All @@ -52,7 +52,7 @@ class OffsetsAndGapsTestGapsTest extends BaseTestNoEnv {
_ <- offsetGaps.update(partition0, Seq(1L))
_ <- offsetGaps.update(partition0, Seq(0L))
_ <- offsetGaps.update(partition1, Seq(0L))
current <- offsetGaps.getCommittableAndClear.map(_._1)
current <- offsetGaps.getCommittableAndClear
} yield current must havePairs(partition0 -> OffsetAndGaps(1L, Seq()), partition1 -> OffsetAndGaps(0L, Seq()))
}

Expand All @@ -66,7 +66,7 @@ class OffsetsAndGapsTestGapsTest extends BaseTestNoEnv {
_ <- offsetGaps.init(initialCommittedOffsets)
_ <- offsetGaps.update(partition0, Seq(101L, 102L))
_ <- offsetGaps.update(partition1, Seq(203L, 204L))
current <- offsetGaps.getCommittableAndClear.map(_._1)
current <- offsetGaps.getCommittableAndClear
} yield current must havePairs(partition0 -> OffsetAndGaps(102L, Seq()), partition1 -> OffsetAndGaps(204L, Seq(Gap(201L, 202L))))
}

Expand Down

0 comments on commit 10ffeb2

Please sign in to comment.