Skip to content

Commit

Permalink
[greyhound] parallel consumer visibility (#34926)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 3cbafbea85e93a678fc7528f1b3f4bb75514b9f2
  • Loading branch information
ben-wattelman authored and wix-oss committed Sep 11, 2023
1 parent 86d8e73 commit fb833c6
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ object Dispatcher {
override def revoke(partitions: Set[TopicPartition]): URIO[GreyhoundMetrics, Unit] =
workers
.modify { workers =>
val revoked = workers.filterKeys(partitions.contains)
val revoked = workers.filterKeys(partitions.contains)
val remaining = workers -- partitions

(revoked, remaining)
Expand Down Expand Up @@ -246,13 +246,23 @@ object Dispatcher {
queue <- Queue.dropping[Record](capacity)
internalState <- TRef.make(WorkerInternalState.empty).commit
fiber <-
(reportWorkerRunningInInterval(every = 60.seconds, internalState)(partition, group, clientId).forkDaemon *>
(if (consumeInParallel)
pollBatch(status, internalState, handle, queue, group, clientId, partition, consumerAttributes, maxParallelism, updateBatch, currentGaps)
else pollOnce(status, internalState, handle, queue, group, clientId, partition, consumerAttributes))
.repeatWhile(_ == true))
.interruptible
.forkDaemon
(reportWorkerRunningInInterval(every = 60.seconds, internalState)(partition, group, clientId).forkDaemon *>
(if (consumeInParallel)
pollBatch(
status,
internalState,
handle,
queue,
group,
clientId,
partition,
consumerAttributes,
maxParallelism,
updateBatch,
currentGaps
)
else pollOnce(status, internalState, handle, queue, group, clientId, partition, consumerAttributes))
.repeatWhile(_ == true)).interruptible.forkDaemon
} yield new Worker {
override def submit(record: Record): URIO[Any, Boolean] =
queue
Expand Down Expand Up @@ -406,17 +416,18 @@ object Dispatcher {
)
groupedRecords = records.groupBy(_.key).values // todo: add sub-grouping for records without key
latestCommitGaps <- currentGaps(records.map(r => TopicPartition(r.topic, r.partition)).toSet)
_ <- ZIO
.foreachParDiscard(groupedRecords)(sameKeyRecords =>
ZIO.foreach(sameKeyRecords) { record =>
if (shouldRecordBeHandled(record, latestCommitGaps)) {
handle(record).interruptible.ignore *> updateBatch(sameKeyRecords).interruptible
} else
report(SkippedPreviouslyHandledRecord(record, group, clientId, consumerAttributes))

}
)
.withParallelism(maxParallelism)
_ <- report(InvokingHandlersInParallel(Math.max(groupedRecords.size, maxParallelism))) *>
ZIO
.foreachParDiscard(groupedRecords)(sameKeyRecords =>
ZIO.foreach(sameKeyRecords) { record =>
if (shouldRecordBeHandled(record, latestCommitGaps)) {
handle(record).interruptible.ignore *> updateBatch(sameKeyRecords).interruptible
} else
report(SkippedPreviouslyHandledRecord(record, group, clientId, consumerAttributes))

}
)
.withParallelism(maxParallelism)
res <- isActive(internalState)
} yield res
}
Expand Down Expand Up @@ -568,6 +579,8 @@ object DispatcherMetric {
currentExecutionStarted: Option[Long]
) extends DispatcherMetric

case class InvokingHandlersInParallel(numHandlers: Int) extends DispatcherMetric

case class SkippedPreviouslyHandledRecord(record: Record, group: Group, clientId: ClientId, attributes: Map[String, String])
extends DispatcherMetric

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ object EventLoop {
val offsetsAndMetadataToCommit = OffsetsAndGaps.toOffsetsAndMetadata(committable)
consumer
.commitWithMetadata(offsetsAndMetadataToCommit)
.tap(_ => report(CommittedOffsetsAndMetadata(offsetsAndMetadataToCommit)))
.tap(_ => ZIO.when(offsetsAndMetadataToCommit.nonEmpty)(report(CommittedOffsetsAndMetadata(offsetsAndMetadataToCommit))))
.catchAll { t =>
report(FailedToCommitOffsetsAndMetadata(t, offsetsAndMetadataToCommit)) *> offsetsAndGaps.setCommittable(committable)
}
Expand Down Expand Up @@ -331,7 +331,8 @@ object EventLoop {
.commitWithMetadataOnRebalance(OffsetsAndGaps.toOffsetsAndMetadata(committable))
.catchAll { _ => offsetsAndGaps.setCommittable(committable) *> DelayedRebalanceEffect.zioUnit }
runtime <- ZIO.runtime[Any]
} yield tle.catchAll { _ => zio.Unsafe.unsafe { implicit s =>
} yield tle.catchAll { _ =>
zio.Unsafe.unsafe { implicit s =>
runtime.unsafe
.run(offsetsAndGaps.setCommittable(committable))
.getOrThrowFiberFailure()
Expand Down Expand Up @@ -407,6 +408,14 @@ object EventLoopMetric {

case class FailedToUpdatePositions(t: Throwable, clientId: ClientId, attributes: Map[String, String] = Map.empty) extends EventLoopMetric

case class FailedToUpdateGapsOnPartitionAssignment(
t: Throwable,
clientId: ClientId,
group: Group,
partitions: Set[TopicPartition],
attributes: Map[String, String] = Map.empty
) extends EventLoopMetric

case class FailedToFetchCommittedGaps(t: Throwable, clientId: ClientId, attributes: Map[String, String] = Map.empty)
extends EventLoopMetric

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@ trait OffsetsAndGaps {

def gapsForPartition(partition: TopicPartition): UIO[Seq[Gap]]

def update(partition: TopicPartition, batch: Seq[Offset]): UIO[Unit]
def update(partition: TopicPartition, batch: Seq[Offset], prevCommittedOffset: Option[Offset]): UIO[Unit]

def update(record: ConsumerRecord[_, _]): UIO[Unit] =
update(RecordTopicPartition(record), Seq(record.offset))
update(RecordTopicPartition(record), Seq(record.offset), None)

def update(records: Chunk[ConsumerRecord[_, _]]): UIO[Unit] = {
val sortedBatch = records.sortBy(_.offset)
update(RecordTopicPartition(sortedBatch.head), sortedBatch.map(_.offset) ++ Seq(sortedBatch.last.offset + 1))
update(RecordTopicPartition(sortedBatch.head), sortedBatch.map(_.offset) ++ Seq(sortedBatch.last.offset + 1), None)
}

def update(partition: TopicPartition, batch: Seq[Offset]): UIO[Unit] =
update(partition, batch, None)

def setCommittable(offsets: Map[TopicPartition, OffsetAndGaps]): UIO[Unit]

def contains(partition: TopicPartition, offset: Offset): UIO[Boolean]
Expand All @@ -40,7 +43,7 @@ object OffsetsAndGaps {
override def gapsForPartition(partition: TopicPartition): UIO[Seq[Gap]] =
ref.get.map(_.get(partition).fold(Seq.empty[Gap])(_.gaps.sortBy(_.start)))

override def update(partition: TopicPartition, batch: Seq[Offset]): UIO[Unit] =
override def update(partition: TopicPartition, batch: Seq[Offset], prevCommittedOffset: Option[Offset]): UIO[Unit] =
ref.update { offsetsAndGaps =>
val sortedBatch = batch.sorted
val maxBatchOffset = sortedBatch.last
Expand Down

0 comments on commit fb833c6

Please sign in to comment.