Skip to content

Commit

Permalink
gh RecordConsumer - allow to override offsetReset with autoResetOffse…
Browse files Browse the repository at this point in the history
…t from extra properties taking into account a non-zero rewindUncommittedOffsetsBy (#35863)

* gh RecordConsumer - allow to override offsetReset with autoResetOffset from extra properties taking into account a non-zero rewindUncommittedOffsetsBy #automerge

* fix tests depending on com.wixpress.greyhound.GreyhoundTestingSupport

* try another tactic to fix tests

* fix build file #automerge

GitOrigin-RevId: dcf84b5740a3f99e1a637fca84c0ef6059b0e1a9
  • Loading branch information
natansil authored and wix-oss committed Sep 8, 2023
1 parent 79eeb15 commit 21b10ea
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 78 deletions.
63 changes: 32 additions & 31 deletions core/src/it/scala/com/wixpress/dst/greyhound/core/ConsumerIT.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.wixpress.dst.greyhound.core

import java.util.concurrent.{TimeUnit, TimeoutException}
import java.util.regex.Pattern
import java.util.regex.Pattern.compile
import com.wixpress.dst.greyhound.core.Serdes._
import com.wixpress.dst.greyhound.core.consumer.ConsumerMetric.PollingFailed
import com.wixpress.dst.greyhound.core.consumer.EventLoop.Handler
Expand All @@ -14,18 +11,18 @@ import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetric
import com.wixpress.dst.greyhound.core.producer.ProducerRecord
import com.wixpress.dst.greyhound.core.testkit.RecordMatchers._
import com.wixpress.dst.greyhound.core.testkit.{eventuallyTimeoutFail, eventuallyZ, AwaitableRef, BaseTestWithSharedEnv, TestMetrics}
import com.wixpress.dst.greyhound.core.zioutils.CountDownLatch
import com.wixpress.dst.greyhound.core.zioutils.Gate
import com.wixpress.dst.greyhound.core.zioutils.{CountDownLatch, Gate}
import com.wixpress.dst.greyhound.testenv.ITEnv
import com.wixpress.dst.greyhound.testenv.ITEnv.{clientId, _}
import com.wixpress.dst.greyhound.testkit.ManagedKafka
import org.specs2.specification.core.Fragments
import zio.Clock
import zio.stm.{STM, TRef}
import zio._
import zio.{Clock, Console, _}
import zio.Clock.sleep
import zio.managed._
import zio.{Clock, _}
import zio.stm.{STM, TRef}

import java.util.concurrent.{TimeUnit, TimeoutException}
import java.util.regex.Pattern
import java.util.regex.Pattern.compile

class ConsumerIT extends BaseTestWithSharedEnv[Env, TestResources] {
sequential
Expand Down Expand Up @@ -390,36 +387,40 @@ class ConsumerIT extends BaseTestWithSharedEnv[Env, TestResources] {
}
}

s"allow to override offsetReset with autoResetOffset from extra properties${parallelConsumerString(useParallelConsumer)}" in
s"allow to override offsetReset with autoResetOffset from extra properties taking into account a non-zero rewindUncommittedOffsetsBy${parallelConsumerString(useParallelConsumer)}" in
ZIO.scoped {
for {
r <- getShared
r <- getShared
TestResources(kafka, producer) = r
_ <- ZIO.debug(">>>> starting test: earliestTest")
topic <- kafka.createRandomTopic(prefix = "core-from-earliest")
group <- randomGroup
_ <- ZIO.debug(">>>> starting test: earliestTest")
topic <- kafka.createRandomTopic(prefix = "core-from-earliest")
group <- randomGroup

queue <- Queue.unbounded[ConsumerRecord[String, String]]
queue <- Queue.unbounded[ConsumerRecord[String, String]]
handler = RecordHandler(queue.offer(_: ConsumerRecord[String, String]))
.withDeserializers(StringSerde, StringSerde)
.ignore
.withDeserializers(StringSerde, StringSerde)
.ignore

record = ProducerRecord(topic, "bar", Some("foo"))
_ <- producer.produce(record, StringSerde, StringSerde)
_ <- producer.produce(record, StringSerde, StringSerde)

message <- RecordConsumer
.make(
configFor(
kafka,
group,
topic,
mutateEventLoop = _.copy(consumePartitionInParallel = useParallelConsumer, maxParallelism = 8)
)
.copy(offsetReset = Latest, extraProperties = Map("auto.offset.reset" -> "earliest")),
handler
)
.flatMap { _ => queue.take }
.timeout(10.seconds)
.make(
configFor(
kafka,
group,
topic,
mutateEventLoop = _.copy(consumePartitionInParallel = useParallelConsumer, maxParallelism = 8)
)
.copy(
offsetReset = Latest, // "default"
extraProperties = Map("auto.offset.reset" -> "earliest"), // overriden by "custom properties"
rewindUncommittedOffsetsBy = 1.millis // non-zero so that we can check that it's taken into account
),
handler
)
.flatMap { _ => queue.take }
.timeout(10.seconds)
} yield {
message.get must (beRecordWithKey("foo") and beRecordWithValue("bar"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import scala.util.{Random, Try}

trait Consumer {
def subscribe[R1](
topics: Set[Topic],
rebalanceListener: RebalanceListener[R1] = RebalanceListener.Empty
)(implicit trace: Trace): RIO[GreyhoundMetrics with R1, Unit]
topics: Set[Topic],
rebalanceListener: RebalanceListener[R1] = RebalanceListener.Empty
)(implicit trace: Trace): RIO[GreyhoundMetrics with R1, Unit]

def subscribePattern[R1](
topicStartsWith: Pattern,
rebalanceListener: RebalanceListener[R1] = RebalanceListener.Empty
)(implicit trace: Trace): RIO[GreyhoundMetrics with R1, Unit]
topicStartsWith: Pattern,
rebalanceListener: RebalanceListener[R1] = RebalanceListener.Empty
)(implicit trace: Trace): RIO[GreyhoundMetrics with R1, Unit]

def poll(timeout: Duration)(implicit trace: Trace): RIO[GreyhoundMetrics, Records]

Expand Down Expand Up @@ -98,17 +98,17 @@ object Consumer {
// if a partition with no committed offset is revoked during processing
// we also may want to seek forward to some given initial offsets
offsetsInitializer <- OffsetsInitializer
.make(
cfg.clientId,
cfg.groupId,
UnsafeOffsetOperations.make(consumer),
timeout = 10.seconds,
timeoutIfSeek = 10.seconds,
initialSeek = cfg.initialSeek,
rewindUncommittedOffsetsBy = cfg.rewindUncommittedOffsetsByMillis.millis,
offsetResetIsEarliest = cfg.offsetReset == OffsetReset.Earliest,
parallelConsumer = cfg.useParallelConsumer
)
.make(
cfg.clientId,
cfg.groupId,
UnsafeOffsetOperations.make(consumer),
timeout = 10.seconds,
timeoutIfSeek = 10.seconds,
initialSeek = cfg.initialSeek,
rewindUncommittedOffsetsBy = cfg.rewindUncommittedOffsetsByMillis.millis,
offsetResetIsEarliest = cfg.offsetResetIsEarliest,
parallelConsumer = cfg.useParallelConsumer
)
} yield {
new Consumer {
override def subscribePattern[R1](topicStartsWith: Pattern, rebalanceListener: RebalanceListener[R1])(
Expand Down Expand Up @@ -156,33 +156,40 @@ object Consumer {
.map(_.asScala.collect { case (tp: KafkaTopicPartition, o: KafkaOffsetAndMetadata) => (TopicPartition(tp), o.offset) }.toMap)

override def committedOffsetsAndMetadata(
partitions: NonEmptySet[TopicPartition]
)(implicit trace: Trace): RIO[Any, Map[TopicPartition, OffsetAndMetadata]] =
partitions: NonEmptySet[TopicPartition]
)(implicit trace: Trace): RIO[Any, Map[TopicPartition, OffsetAndMetadata]] =
withConsumerBlocking(_.committed(kafkaPartitions(partitions)))
.map(_.asScala.collect { case (tp: KafkaTopicPartition, om: KafkaOffsetAndMetadata) => (TopicPartition(tp), OffsetAndMetadata(om.offset, om.metadata))}.toMap)
.map(
_.asScala
.collect {
case (tp: KafkaTopicPartition, om: KafkaOffsetAndMetadata) =>
(TopicPartition(tp), OffsetAndMetadata(om.offset, om.metadata))
}
.toMap
)

override def commit(offsets: Map[TopicPartition, Offset])(implicit trace: Trace): RIO[GreyhoundMetrics, Unit] = {
withConsumerBlocking(_.commitSync(kafkaOffsetsAndMetaData(toOffsetsAndMetadata(offsets, cfg.commitMetadataString))))
}

override def commitWithMetadata(
offsetsAndMetadata: Map[TopicPartition, OffsetAndMetadata]
)(implicit trace: Trace): RIO[GreyhoundMetrics, Unit] = {
offsetsAndMetadata: Map[TopicPartition, OffsetAndMetadata]
)(implicit trace: Trace): RIO[GreyhoundMetrics, Unit] = {
withConsumerBlocking(_.commitSync(kafkaOffsetsAndMetaData(offsetsAndMetadata)))
}

override def commitOnRebalance(
offsets: Map[TopicPartition, Offset]
)(implicit trace: Trace): RIO[GreyhoundMetrics, DelayedRebalanceEffect] = {
offsets: Map[TopicPartition, Offset]
)(implicit trace: Trace): RIO[GreyhoundMetrics, DelayedRebalanceEffect] = {
val kOffsets = kafkaOffsetsAndMetaData(toOffsetsAndMetadata(offsets, cfg.commitMetadataString))
// we can't actually call commit here, as it needs to be called from the same
// thread, that triggered poll(), so we return the commit action as thunk
ZIO.succeed(DelayedRebalanceEffect(consumer.commitSync(kOffsets)))
}

override def commitWithMetadataOnRebalance(
offsets: Map[TopicPartition, OffsetAndMetadata]
)(implicit trace: Trace): RIO[GreyhoundMetrics, DelayedRebalanceEffect] =
offsets: Map[TopicPartition, OffsetAndMetadata]
)(implicit trace: Trace): RIO[GreyhoundMetrics, DelayedRebalanceEffect] =
ZIO.succeed(DelayedRebalanceEffect(consumer.commitSync(kafkaOffsetsAndMetaData(offsets))))

override def pause(partitions: Set[TopicPartition])(implicit trace: Trace): ZIO[Any, IllegalStateException, Unit] =
Expand Down Expand Up @@ -234,8 +241,8 @@ object Consumer {
semaphore.withPermit(f(consumer))

override def offsetsForTimes(
topicPartitionsOnTimestamp: Map[TopicPartition, Long]
)(implicit trace: Trace): RIO[Any, Map[TopicPartition, Offset]] = {
topicPartitionsOnTimestamp: Map[TopicPartition, Long]
)(implicit trace: Trace): RIO[Any, Map[TopicPartition, Offset]] = {
val kafkaTopicPartitionsOnTimestamp = topicPartitionsOnTimestamp.map { case (tp, ts) => tp.asKafka -> ts }
withConsumerBlocking(_.offsetsForTimes(kafkaTopicPartitionsOnTimestamp.mapValues(l => new lang.Long(l)).toMap.asJava))
.map(
Expand Down Expand Up @@ -291,9 +298,9 @@ object Consumer {
}

private def makeConsumer(
config: ConsumerConfig,
semaphore: Semaphore
)(implicit trace: Trace): RIO[GreyhoundMetrics with Scope, KafkaConsumer[Chunk[Byte], Chunk[Byte]]] = {
config: ConsumerConfig,
semaphore: Semaphore
)(implicit trace: Trace): RIO[GreyhoundMetrics with Scope, KafkaConsumer[Chunk[Byte], Chunk[Byte]]] = {
val acquire = ZIO.attemptBlocking(new KafkaConsumer(config.properties, deserializer, deserializer))
def close(consumer: KafkaConsumer[_, _]) =
attemptBlocking(consumer.close())
Expand All @@ -306,19 +313,19 @@ object Consumer {
}

case class ConsumerConfig(
bootstrapServers: String,
groupId: Group,
clientId: ClientId = s"wix-consumer-${Random.alphanumeric.take(5).mkString}",
offsetReset: OffsetReset = OffsetReset.Latest,
extraProperties: Map[String, String] = Map.empty,
additionalListener: RebalanceListener[Any] = RebalanceListener.Empty,
initialSeek: InitialOffsetsSeek = InitialOffsetsSeek.default,
consumerAttributes: Map[String, String] = Map.empty,
decryptor: Decryptor[Any, Throwable, Chunk[Byte], Chunk[Byte]] = new NoOpDecryptor,
commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA,
rewindUncommittedOffsetsByMillis: Long = 0L,
useParallelConsumer: Boolean = false
) extends CommonGreyhoundConfig {
bootstrapServers: String,
groupId: Group,
clientId: ClientId = s"wix-consumer-${Random.alphanumeric.take(5).mkString}",
offsetReset: OffsetReset = OffsetReset.Latest,
extraProperties: Map[String, String] = Map.empty,
additionalListener: RebalanceListener[Any] = RebalanceListener.Empty,
initialSeek: InitialOffsetsSeek = InitialOffsetsSeek.default,
consumerAttributes: Map[String, String] = Map.empty,
decryptor: Decryptor[Any, Throwable, Chunk[Byte], Chunk[Byte]] = new NoOpDecryptor,
commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA,
rewindUncommittedOffsetsByMillis: Long = 0L,
useParallelConsumer: Boolean = false
) extends CommonGreyhoundConfig {

override def kafkaProps: Map[String, String] = Map(
KafkaConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
Expand All @@ -334,6 +341,9 @@ case class ConsumerConfig(
KafkaConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false"
) ++ extraProperties

def offsetResetIsEarliest: Boolean =
extraProperties.get("auto.offset.reset").map(_ == "earliest").getOrElse(offsetReset == OffsetReset.Earliest)

def withExtraProperties(props: (String, String)*) =
copy(extraProperties = extraProperties ++ props)

Expand Down Expand Up @@ -394,9 +404,9 @@ object UnsafeOffsetOperations {
}

override def committedWithMetadata(
partitions: NonEmptySet[TopicPartition],
timeout: zio.Duration
): Map[TopicPartition, OffsetAndMetadata] = {
partitions: NonEmptySet[TopicPartition],
timeout: zio.Duration
): Map[TopicPartition, OffsetAndMetadata] = {
consumer
.committed(partitions.map(_.asKafka).asJava, timeout)
.asScala
Expand Down

0 comments on commit 21b10ea

Please sign in to comment.