Skip to content

Commit

Permalink
[greyhound] parallel consumer implementation (#34061)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 5c9cc1ea4fcc5935c6f905a5269ac17f6b82c294
  • Loading branch information
ben-wattelman authored and wix-oss committed Sep 23, 2023
1 parent 3929d77 commit 862e338
Show file tree
Hide file tree
Showing 16 changed files with 1,497 additions and 467 deletions.
854 changes: 475 additions & 379 deletions core/src/it/scala/com/wixpress/dst/greyhound/core/ConsumerIT.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package(default_visibility = ["//visibility:public"])

sources()

specs2_ite2e_test(
name = "parallel",
srcs = [
":sources",
],
deps = [
"//core/src/it/resources",
"//core/src/it/scala/com/wixpress/dst/greyhound/testenv",
"//core/src/it/scala/com/wixpress/dst/greyhound/testkit",
"//core/src/main/scala/com/wixpress/dst/greyhound/core",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/consumer",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/domain",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/metrics",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/producer",
"//core/src/main/scala/com/wixpress/dst/greyhound/core/zioutils",
"//core/src/test/scala/com/wixpress/dst/greyhound/core/testkit",
"@dev_zio_izumi_reflect_2_12",
"@dev_zio_zio_2_12",
"@dev_zio_zio_managed_2_12",
"@org_apache_kafka_kafka_clients",
],
)

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ object OffsetAndMetadata {
def apply(offsetAndMetadata: KafkaOffsetAndMetadata): OffsetAndMetadata =
OffsetAndMetadata(offsetAndMetadata.offset(), offsetAndMetadata.metadata())

def apply(offset: Offset): OffsetAndMetadata =
OffsetAndMetadata(offset, NO_METADATA)

val NO_METADATA = ""
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,24 @@ trait Consumer {

def commit(offsets: Map[TopicPartition, Offset])(implicit trace: Trace): RIO[GreyhoundMetrics, Unit]

def commitWithMetadata(offsetsAndMetadata: Map[TopicPartition, OffsetAndMetadata])(implicit trace: Trace): RIO[GreyhoundMetrics, Unit]

def endOffsets(partitions: Set[TopicPartition])(implicit trace: Trace): RIO[Any, Map[TopicPartition, Offset]]

def beginningOffsets(partitions: Set[TopicPartition])(implicit trace: Trace): RIO[Any, Map[TopicPartition, Offset]]

def committedOffsets(partitions: Set[TopicPartition])(implicit trace: Trace): RIO[Any, Map[TopicPartition, Offset]]

def committedOffsetsAndMetadata(partitions: Set[TopicPartition])(implicit trace: Trace): RIO[Any, Map[TopicPartition, OffsetAndMetadata]]

def offsetsForTimes(topicPartitionsOnTimestamp: Map[TopicPartition, Long])(implicit trace: Trace): RIO[Any, Map[TopicPartition, Offset]]

def commitOnRebalance(offsets: Map[TopicPartition, Offset])(implicit trace: Trace): RIO[GreyhoundMetrics, DelayedRebalanceEffect]

def commitWithMetadataOnRebalance(offsets: Map[TopicPartition, OffsetAndMetadata])(
implicit trace: Trace
): RIO[GreyhoundMetrics, DelayedRebalanceEffect]

def pause(partitions: Set[TopicPartition])(implicit trace: Trace): ZIO[GreyhoundMetrics, IllegalStateException, Unit]

def resume(partitions: Set[TopicPartition])(implicit trace: Trace): ZIO[GreyhoundMetrics, IllegalStateException, Unit]
Expand Down Expand Up @@ -96,7 +104,8 @@ object Consumer {
timeoutIfSeek = 10.seconds,
initialSeek = cfg.initialSeek,
rewindUncommittedOffsetsBy = cfg.rewindUncommittedOffsetsByMillis.millis,
offsetResetIsEarliest = cfg.offsetReset == OffsetReset.Earliest
offsetResetIsEarliest = cfg.offsetReset == OffsetReset.Earliest,
parallelConsumer = cfg.useParallelConsumer
)
} yield {
new Consumer {
Expand Down Expand Up @@ -144,10 +153,22 @@ object Consumer {
withConsumerBlocking(_.committed(kafkaPartitions(partitions)))
.map(_.asScala.collect { case (tp: KafkaTopicPartition, o: KafkaOffsetAndMetadata) => (TopicPartition(tp), o.offset) }.toMap)

override def committedOffsetsAndMetadata(
partitions: NonEmptySet[TopicPartition]
)(implicit trace: Trace): RIO[Any, Map[TopicPartition, OffsetAndMetadata]] =
withConsumerBlocking(_.committed(kafkaPartitions(partitions)))
.map(_.asScala.collect { case (tp: KafkaTopicPartition, om: KafkaOffsetAndMetadata) => (TopicPartition(tp), OffsetAndMetadata(om.offset, om.metadata))}.toMap)

override def commit(offsets: Map[TopicPartition, Offset])(implicit trace: Trace): RIO[GreyhoundMetrics, Unit] = {
withConsumerBlocking(_.commitSync(kafkaOffsetsAndMetaData(toOffsetsAndMetadata(offsets, cfg.commitMetadataString))))
}

override def commitWithMetadata(
offsetsAndMetadata: Map[TopicPartition, OffsetAndMetadata]
)(implicit trace: Trace): RIO[GreyhoundMetrics, Unit] = {
withConsumerBlocking(_.commitSync(kafkaOffsetsAndMetaData(offsetsAndMetadata)))
}

override def commitOnRebalance(
offsets: Map[TopicPartition, Offset]
)(implicit trace: Trace): RIO[GreyhoundMetrics, DelayedRebalanceEffect] = {
Expand All @@ -157,6 +178,11 @@ object Consumer {
ZIO.succeed(DelayedRebalanceEffect(consumer.commitSync(kOffsets)))
}

override def commitWithMetadataOnRebalance(
offsets: Map[TopicPartition, OffsetAndMetadata]
)(implicit trace: Trace): RIO[GreyhoundMetrics, DelayedRebalanceEffect] =
ZIO.succeed(DelayedRebalanceEffect(consumer.commitSync(kafkaOffsetsAndMetaData(offsets))))

override def pause(partitions: Set[TopicPartition])(implicit trace: Trace): ZIO[Any, IllegalStateException, Unit] =
withConsumer(_.pause(kafkaPartitions(partitions))).refineOrDie { case e: IllegalStateException => e }

Expand Down Expand Up @@ -285,7 +311,8 @@ case class ConsumerConfig(
consumerAttributes: Map[String, String] = Map.empty,
decryptor: Decryptor[Any, Throwable, Chunk[Byte], Chunk[Byte]] = new NoOpDecryptor,
commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA,
rewindUncommittedOffsetsByMillis: Long = 0L
rewindUncommittedOffsetsByMillis: Long = 0L,
useParallelConsumer: Boolean = false
) extends CommonGreyhoundConfig {

override def kafkaProps: Map[String, String] = Map(
Expand Down Expand Up @@ -320,12 +347,16 @@ object OffsetReset {
trait UnsafeOffsetOperations {
def committed(partitions: Set[TopicPartition], timeout: zio.Duration): Map[TopicPartition, Offset]

def committedWithMetadata(partitions: Set[TopicPartition], timeout: zio.Duration): Map[TopicPartition, OffsetAndMetadata]

def beginningOffsets(partitions: Set[TopicPartition], timeout: zio.Duration): Map[TopicPartition, Offset]

def position(partition: TopicPartition, timeout: zio.Duration): Offset

def commit(offsets: Map[TopicPartition, Offset], timeout: Duration): Unit

def commitWithMetadata(offsets: Map[TopicPartition, OffsetAndMetadata], timeout: Duration): Unit

def seek(offsets: Map[TopicPartition, Offset]): Unit

def endOffsets(partitions: Set[TopicPartition], timeout: Duration): Map[TopicPartition, Offset]
Expand Down Expand Up @@ -357,6 +388,20 @@ object UnsafeOffsetOperations {
}
}

override def committedWithMetadata(
partitions: NonEmptySet[TopicPartition],
timeout: zio.Duration
): Map[TopicPartition, OffsetAndMetadata] = {
consumer
.committed(partitions.map(_.asKafka).asJava, timeout)
.asScala
.toMap
.collect {
case (tp, ofm) if ofm != null =>
TopicPartition(tp) -> OffsetAndMetadata(ofm.offset(), ofm.metadata())
}
}

override def beginningOffsets(partitions: Set[TopicPartition], timeout: Duration): Map[TopicPartition, Offset] =
consumer
.beginningOffsets(partitions.map(_.asKafka).asJava, timeout)
Expand All @@ -374,6 +419,10 @@ object UnsafeOffsetOperations {
consumer.commitSync(kafkaOffsets(offsets), timeout)
}

override def commitWithMetadata(offsets: Map[TopicPartition, OffsetAndMetadata], timeout: zio.Duration): Unit = {
consumer.commitSync(kafkaOffsetsAndMetaData(offsets), timeout)
}

override def seek(offsets: Map[TopicPartition, Offset]): Unit =
offsets.foreach { case (tp, offset) => Try(consumer.seek(tp.asKafka, offset)) }

Expand Down
Loading

0 comments on commit 862e338

Please sign in to comment.