diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/BlockingRetryRecordHandler.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/BlockingRetryRecordHandler.scala index 502e31e1..2f5d59a9 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/BlockingRetryRecordHandler.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/BlockingRetryRecordHandler.scala @@ -3,9 +3,8 @@ package com.wixpress.dst.greyhound.core.consumer.retry import java.util.concurrent.TimeUnit import com.wixpress.dst.greyhound.core.{Group, TopicPartition} import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, RecordHandler} -import com.wixpress.dst.greyhound.core.consumer.retry.BlockingState.{Blocked, Blocking => InternalBlocking, IgnoringOnce} +import com.wixpress.dst.greyhound.core.consumer.retry.BlockingState.{Blocked, IgnoringOnce, Blocking => InternalBlocking} import com.wixpress.dst.greyhound.core.consumer.retry.RetryRecordHandlerMetric.{BlockingRetryHandlerInvocationFailed, DoneBlockingBeforeRetry, NoRetryOnNonRetryableFailure} -import com.wixpress.dst.greyhound.core.consumer.retry.ZIOHelper.foreachWhile import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics.report import com.wixpress.dst.greyhound.core.zioutils.AwaitShutdown @@ -31,7 +30,7 @@ private[retry] object BlockingRetryRecordHandler { override def handle(record: ConsumerRecord[K, V])(implicit trace: Trace): ZIO[GreyhoundMetrics with R, Nothing, LastHandleResult] = { val topicPartition = TopicPartition(record.topic, record.partition) - def pollBlockingStateWithSuspensions(interval: Duration, start: Long): URIO[GreyhoundMetrics, PollResult] = { + def pollBlockingStateWithSuspensions(record: ConsumerRecord[K, V], interval: Duration, start: Long): URIO[GreyhoundMetrics, PollResult] = { for { shouldBlock <- blockingStateResolver.resolve(record) shouldPollAgain <- @@ -43,14 +42,14 @@ private[retry] object BlockingRetryRecordHandler { } yield shouldPollAgain } - def blockOnErrorFor(interval: Duration) = { + def blockOnErrorFor(record: ConsumerRecord[K, V], interval: Duration) = { for { start <- currentTime(TimeUnit.MILLISECONDS) continueBlocking <- if (interval.toMillis > 100L) { awaitShutdown(record.topicPartition).flatMap( _.interruptOnShutdown( - pollBlockingStateWithSuspensions(interval, start).repeatWhile(result => result.pollAgain).map(_.blockHandling) + pollBlockingStateWithSuspensions(record, interval, start).repeatWhile(result => result.pollAgain).map(_.blockHandling) ).reporting(r => DoneBlockingBeforeRetry(record.topic, record.partition, record.offset, r.duration, r.failed)) ) } else { @@ -63,6 +62,7 @@ private[retry] object BlockingRetryRecordHandler { } def handleAndMaybeBlockOnErrorFor( + record: ConsumerRecord[K, V], interval: Option[Duration] ): ZIO[R with GreyhoundMetrics, Nothing, LastHandleResult] = { handler.handle(record).map(_ => LastHandleResult(lastHandleSucceeded = true, shouldContinue = false)).catchAll { @@ -73,7 +73,7 @@ private[retry] object BlockingRetryRecordHandler { case error => interval .map { interval => - report(BlockingRetryHandlerInvocationFailed(topicPartition, record.offset, error.toString)) *> blockOnErrorFor(interval) + report(BlockingRetryHandlerInvocationFailed(topicPartition, record.offset, error.toString)) *> blockOnErrorFor(record, interval) } .getOrElse(ZIO.succeed(LastHandleResult(lastHandleSucceeded = false, shouldContinue = false))) } @@ -96,13 +96,44 @@ private[retry] object BlockingRetryRecordHandler { } else { val durationsIncludingForInvocationWithNoErrorHandling = retryConfig.blockingBackoffs(record.topic)().map(Some(_)) :+ None for { - result <- foreachWhile(durationsIncludingForInvocationWithNoErrorHandling) { interval => handleAndMaybeBlockOnErrorFor(interval) } - _ <- maybeBackToStateBlocking + result <- retryEvery(record, durationsIncludingForInvocationWithNoErrorHandling) { (rec, interval) => + handleAndMaybeBlockOnErrorFor(rec, interval) + } + _ <- maybeBackToStateBlocking } yield result } } } + private def retryEvery[K, V, R, E](record: ConsumerRecord[K, V], as: Iterable[Option[Duration]])( + f: (ConsumerRecord[K, V], Option[Duration]) => ZIO[R, E, LastHandleResult] + )(implicit trace: Trace): ZIO[R, E, LastHandleResult] = { + ZIO.succeed(as.iterator).flatMap { i => + def loop(retryAttempt: Option[RetryAttempt]): ZIO[R, E, LastHandleResult] = + if (i.hasNext) { + val nextDelay = i.next + val recordWithAttempt = retryAttempt.fold(record) { attempt => + record.copy(headers = record.headers ++ RetryAttempt.toHeaders(attempt)) + } + f(recordWithAttempt, nextDelay).flatMap { result => + if (result.shouldContinue) Clock.instant.flatMap { now => + val nextAttempt = RetryAttempt( + originalTopic = record.topic, + attempt = retryAttempt.fold(0)(_.attempt + 1), + submittedAt = now, + backoff = nextDelay getOrElse Duration.Zero + ) + loop(Some(nextAttempt)) + } + else ZIO.succeed(result) + } + } + else ZIO.succeed(LastHandleResult(lastHandleSucceeded = false, shouldContinue = false)) + + loop(None) + } + } + private def handleNonRetriable[K, V, E, R](record: ConsumerRecord[K, V], topicPartition: TopicPartition, cause: Exception) = report(NoRetryOnNonRetryableFailure(topicPartition, record.offset, cause)) .as(LastHandleResult(lastHandleSucceeded = false, shouldContinue = false)) diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/NonBlockingRetryHelper.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/NonBlockingRetryHelper.scala index a73b7dce..1d1cd24c 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/NonBlockingRetryHelper.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/NonBlockingRetryHelper.scala @@ -1,32 +1,23 @@ package com.wixpress.dst.greyhound.core.consumer.retry -import java.time.{Duration => JavaDuration, Instant} -import java.util.concurrent.TimeUnit.MILLISECONDS -import java.util.regex.Pattern -import com.wixpress.dst.greyhound.core.Serdes.StringSerde import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerSubscription.{TopicPattern, Topics} -import com.wixpress.dst.greyhound.core.consumer.retry.RetryAttempt.{currentTime, RetryAttemptNumber} -import com.wixpress.dst.greyhound.core.consumer.retry.RetryDecision.{NoMoreRetries, RetryWith} import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, ConsumerSubscription} +import com.wixpress.dst.greyhound.core.consumer.retry.RetryDecision.{NoMoreRetries, RetryWith} import com.wixpress.dst.greyhound.core.consumer.retry.RetryRecordHandlerMetric.WaitingForRetry import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics.report import com.wixpress.dst.greyhound.core.producer.ProducerRecord -import com.wixpress.dst.greyhound.core.{durationDeserializer, instantDeserializer, Group, Headers, Topic} -import zio.Clock -import zio.Duration +import com.wixpress.dst.greyhound.core.{Group, Topic} import zio.Schedule.spaced -import zio.{Chunk, UIO, URIO, _} +import zio.{Chunk, Clock, Duration, URIO, _} +import java.time.Instant +import java.util.regex.Pattern import scala.util.Try trait NonBlockingRetryHelper { def retryTopicsFor(originalTopic: Topic): Set[Topic] - def retryAttempt(topic: Topic, headers: Headers, subscription: ConsumerSubscription)( - implicit trace: Trace - ): UIO[Option[RetryAttempt]] - def retryDecision[E]( retryAttempt: Option[RetryAttempt], record: ConsumerRecord[Chunk[Byte], Chunk[Byte]], @@ -34,7 +25,7 @@ trait NonBlockingRetryHelper { subscription: ConsumerSubscription )(implicit trace: Trace): URIO[Any, RetryDecision] - def retrySteps = retryTopicsFor("").size + def retrySteps: Int = retryTopicsFor("").size } object NonBlockingRetryHelper { @@ -49,82 +40,70 @@ object NonBlockingRetryHelper { override def retryTopicsFor(topic: Topic): Set[Topic] = policy(topic).intervals.indices.foldLeft(Set.empty[String])((acc, attempt) => acc + s"$topic-$group-retry-$attempt") - override def retryAttempt(topic: Topic, headers: Headers, subscription: ConsumerSubscription)( - implicit trace: Trace - ): UIO[Option[RetryAttempt]] = { - (for { - submitted <- headers.get(RetryHeader.Submitted, instantDeserializer) - backoff <- headers.get(RetryHeader.Backoff, durationDeserializer) - originalTopic <- headers.get[String](RetryHeader.OriginalTopic, StringSerde) - } yield for { - ta <- topicAttempt(subscription, topic, originalTopic) - TopicAttempt(originalTopic, attempt) = ta - s <- submitted - b <- backoff - } yield RetryAttempt(originalTopic, attempt, s, b)) - .catchAll(_ => ZIO.none) - } - - private def topicAttempt( - subscription: ConsumerSubscription, - topic: Topic, - originalTopicHeader: Option[String] - ) = - subscription match { - case _: Topics => extractTopicAttempt(group, topic) - case _: TopicPattern => - extractTopicAttemptFromPatternRetryTopic(group, topic, originalTopicHeader) - } - override def retryDecision[E]( retryAttempt: Option[RetryAttempt], record: ConsumerRecord[Chunk[Byte], Chunk[Byte]], error: E, subscription: ConsumerSubscription - )(implicit trace: Trace): URIO[Any, RetryDecision] = currentTime.map(now => { - val nextRetryAttempt = retryAttempt.fold(0)(_.attempt + 1) + )(implicit trace: Trace): URIO[Any, RetryDecision] = Clock.instant.map(now => { + val blockingRetriesBefore = RetryAttempt.maxBlockingAttempts( + NonBlockingRetryHelper.originalTopic(record.topic, group), + retryConfig + ).getOrElse(0) + + // attempt if present contains full number of retries + val nextNonBlockingAttempt = retryAttempt.fold(0)(_.attempt + 1 - blockingRetriesBefore) + val nextRetryAttempt = nextNonBlockingAttempt + blockingRetriesBefore val originalTopic = retryAttempt.fold(record.topic)(_.originalTopic) val retryTopic = subscription match { - case _: TopicPattern => patternRetryTopic(group, nextRetryAttempt) - case _: Topics => fixedRetryTopic(originalTopic, group, nextRetryAttempt) + case _: TopicPattern => patternRetryTopic(group, nextNonBlockingAttempt) + case _: Topics => fixedRetryTopic(originalTopic, group, nextNonBlockingAttempt) } val topicRetryPolicy = policy(record.topic) topicRetryPolicy.intervals - .lift(nextRetryAttempt) + .lift(nextNonBlockingAttempt) .map { backoff => + val attempt = RetryAttempt( + attempt = nextRetryAttempt, + originalTopic = originalTopic, + submittedAt = now, + backoff = backoff + ) topicRetryPolicy.recordMutate( ProducerRecord( topic = retryTopic, value = record.value, key = record.key, partition = None, - headers = record.headers + - (RetryHeader.Submitted -> toChunk(now.toEpochMilli)) + - (RetryHeader.Backoff -> toChunk(backoff.toMillis)) + - (RetryHeader.OriginalTopic -> toChunk(originalTopic)) + - (RetryHeader.RetryAttempt -> toChunk(nextRetryAttempt)) + headers = record.headers ++ RetryAttempt.toHeaders(attempt) ) ) } .fold[RetryDecision](NoMoreRetries)(RetryWith) }) + } - private def toChunk(long: Long): Chunk[Byte] = - Chunk.fromArray(long.toString.getBytes) - - private def toChunk(str: String): Chunk[Byte] = - Chunk.fromArray(str.getBytes) + private[retry] def attemptNumberFromTopic( + subscription: ConsumerSubscription, + topic: Topic, + originalTopicHeader: Option[String], + group: Group + ) = + subscription match { + case _: Topics => extractTopicAttempt(group, topic) + case _: TopicPattern => + extractTopicAttemptFromPatternRetryTopic(group, topic, originalTopicHeader) } - private def extractTopicAttempt[E](group: Group, inputTopic: Topic) = + private def extractTopicAttempt(group: Group, inputTopic: Topic) = inputTopic.split(s"-$group-retry-").toSeq match { case Seq(topic, attempt) if Try(attempt.toInt).isSuccess => Some(TopicAttempt(topic, attempt.toInt)) - case _ => None + case _ => None } - private def extractTopicAttemptFromPatternRetryTopic[E]( + private def extractTopicAttemptFromPatternRetryTopic( group: Group, inputTopic: Topic, originalTopicHeader: Option[String] @@ -166,49 +145,27 @@ object DelayHeaders { val Backoff = "backOffTimeMs" } -object RetryHeader { - val Submitted = "submitTimestamp" - val Backoff = DelayHeaders.Backoff - val OriginalTopic = "GH_OriginalTopic" - val RetryAttempt = "GH_RetryAttempt" -} - -case class RetryAttempt( - originalTopic: Topic, - attempt: RetryAttemptNumber, - submittedAt: Instant, - backoff: Duration -) { - - def sleep(implicit trace: Trace): URIO[GreyhoundMetrics, Unit] = - RetryUtil.sleep(submittedAt, backoff) race reportWaitingInIntervals(every = 60.seconds) - - private def reportWaitingInIntervals(every: Duration) = - report(WaitingForRetry(originalTopic, attempt, submittedAt.toEpochMilli, backoff.toMillis)) - .repeat(spaced(every)) - .unit -} - object RetryUtil { + def sleep(attempt: RetryAttempt)(implicit trace: Trace): URIO[GreyhoundMetrics, Unit] = + sleep(attempt.submittedAt, attempt.backoff) race + report(WaitingForRetry(attempt.originalTopic, attempt.attempt, attempt.submittedAt.toEpochMilli, attempt.backoff.toMillis)) + .repeat(spaced(60.seconds)) + .unit + def sleep(submittedAt: Instant, backoff: Duration)(implicit trace: Trace): URIO[Any, Unit] = { val expiresAt = submittedAt.plus(backoff.asJava) - currentTime + Clock.instant .map(_.isAfter(expiresAt)) .flatMap(expired => if (expired) ZIO.unit else - ZIO.sleep(1.seconds).repeatUntilZIO(_ => currentTime.map(_.isAfter(expiresAt))).unit + ZIO.sleep(1.second).repeatUntilZIO(_ => Clock.instant.map(_.isAfter(expiresAt))).unit ) } } private case class TopicAttempt(originalTopic: Topic, attempt: Int) -object RetryAttempt { - type RetryAttemptNumber = Int - val currentTime = Clock.currentTime(MILLISECONDS).map(Instant.ofEpochMilli) -} - sealed trait RetryDecision object RetryDecision { diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/NonBlockingRetryRecordHandler.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/NonBlockingRetryRecordHandler.scala index a15f1e5b..a6ff6560 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/NonBlockingRetryRecordHandler.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/NonBlockingRetryRecordHandler.scala @@ -31,11 +31,12 @@ private[retry] object NonBlockingRetryRecordHandler { retryConfig: RetryConfig, subscription: ConsumerSubscription, nonBlockingRetryHelper: NonBlockingRetryHelper, + groupId: Group, awaitShutdown: TopicPartition => UIO[AwaitShutdown] )(implicit evK: K <:< Chunk[Byte], evV: V <:< Chunk[Byte]): NonBlockingRetryRecordHandler[V, K, R] = new NonBlockingRetryRecordHandler[V, K, R] { override def handle(record: ConsumerRecord[K, V]): ZIO[GreyhoundMetrics with R, Nothing, Any] = { - nonBlockingRetryHelper.retryAttempt(record.topic, record.headers, subscription).flatMap { retryAttempt => + RetryAttempt.extract(record.headers, record.topic, groupId, subscription, Some(retryConfig)).flatMap { retryAttempt => maybeDelayRetry(record, retryAttempt) *> handler.handle(record).catchAll { case Right(_: NonRetriableException) => ZIO.unit @@ -56,7 +57,7 @@ private[retry] object NonBlockingRetryRecordHandler { WaitingBeforeRetry(record.topic, retryAttempt, record.partition, record.offset, correlationId) ) *> awaitShutdown(record.topicPartition) - .flatMap(_.interruptOnShutdown(retryAttempt.sleep)) + .flatMap(_.interruptOnShutdown(RetryUtil.sleep(retryAttempt))) .reporting(r => DoneWaitingBeforeRetry(record.topic, record.partition, record.offset, retryAttempt, r.duration, r.failed, correlationId) ) @@ -74,7 +75,7 @@ private[retry] object NonBlockingRetryRecordHandler { override def handleAfterBlockingFailed( record: ConsumerRecord[K, V] ): ZIO[GreyhoundMetrics with R, Nothing, Any] = { - nonBlockingRetryHelper.retryAttempt(record.topic, record.headers, subscription).flatMap { retryAttempt => + RetryAttempt.extract(record.headers, record.topic, groupId, subscription, Some(retryConfig)).flatMap { retryAttempt => maybeRetry(retryAttempt, BlockingHandlerFailed, record) } } diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryAttempt.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryAttempt.scala new file mode 100644 index 00000000..3fa3cba8 --- /dev/null +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryAttempt.scala @@ -0,0 +1,98 @@ +package com.wixpress.dst.greyhound.core.consumer.retry + +import com.wixpress.dst.greyhound.core.Serdes.StringSerde +import com.wixpress.dst.greyhound.core._ +import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerSubscription +import com.wixpress.dst.greyhound.core.consumer.retry.NonBlockingRetryHelper.attemptNumberFromTopic +import com.wixpress.dst.greyhound.core.consumer.retry.RetryAttempt.RetryAttemptNumber +import zio._ + +import java.time.Instant + +/** + * Description of a retry attempt + * @param attempt contains which attempt is it, starting from 0 including blocking and non-blocking attempts + */ +case class RetryAttempt( + originalTopic: Topic, + attempt: RetryAttemptNumber, + submittedAt: Instant, + backoff: Duration +) + +object RetryHeader { + val Submitted = "submitTimestamp" + val Backoff = DelayHeaders.Backoff + val OriginalTopic = "GH_OriginalTopic" + val RetryAttempt = "GH_RetryAttempt" +} + +object RetryAttempt { + type RetryAttemptNumber = Int + + private def toChunk(str: String): Chunk[Byte] = Chunk.fromArray(str.getBytes) + + def toHeaders(attempt: RetryAttempt): Headers = Headers( + RetryHeader.Submitted -> toChunk(attempt.submittedAt.toEpochMilli.toString), + RetryHeader.Backoff -> toChunk(attempt.backoff.toMillis.toString), + RetryHeader.OriginalTopic -> toChunk(attempt.originalTopic), + RetryHeader.RetryAttempt -> toChunk(attempt.attempt.toString), + ) + + private case class RetryAttemptHeaders( + originalTopic: Option[Topic], + attempt: Option[RetryAttemptNumber], + submittedAt: Option[Instant], + backoff: Option[Duration] + ) + + private def fromHeaders(headers: Headers): Task[RetryAttemptHeaders] = + for { + submitted <- headers.get(RetryHeader.Submitted, instantDeserializer) + backoff <- headers.get(RetryHeader.Backoff, durationDeserializer) + topic <- headers.get[String](RetryHeader.OriginalTopic, StringSerde) + attempt <- headers.get(RetryHeader.RetryAttempt, longDeserializer) + } yield RetryAttemptHeaders(topic, attempt.map(_.toInt), submitted, backoff) + + /** @return None on infinite blocking retries */ + def maxBlockingAttempts(topic: Topic, retryConfig: Option[RetryConfig]): Option[Int] = + retryConfig.map(_.blockingBackoffs(topic)()).fold(Option(0)) { + case finite if finite.hasDefiniteSize => Some(finite.size) + case _ => None + } + + /** @return None on infinite retries */ + def maxOverallAttempts(topic: Topic, retryConfig: Option[RetryConfig]): Option[Int] = + maxBlockingAttempts(topic, retryConfig).map { + _ + retryConfig.fold(0)(_.nonBlockingBackoffs(topic).length) + } + + def extract( + headers: Headers, + topic: Topic, + group: Group, + subscription: ConsumerSubscription, + retryConfig: Option[RetryConfig], + )(implicit trace: Trace): UIO[Option[RetryAttempt]] = { + + def maybeNonBlockingAttempt(hs: RetryAttemptHeaders): Option[RetryAttempt] = + for { + submitted <- hs.submittedAt + backoff <- hs.backoff + TopicAttempt(originalTopic, attempt) <- attemptNumberFromTopic(subscription, topic, hs.originalTopic, group) + blockingRetries = maxBlockingAttempts(originalTopic, retryConfig).getOrElse(0) + } yield RetryAttempt(originalTopic, blockingRetries + attempt, submitted, backoff) + + def maybeBlockingAttempt(hs: RetryAttemptHeaders): Option[RetryAttempt] = + for { + submitted <- hs.submittedAt + backoff <- hs.backoff + originalTopic <- hs.originalTopic if originalTopic == topic + attempt <- hs.attempt + } yield RetryAttempt(originalTopic, attempt, submitted, backoff) + + fromHeaders(headers).map { hs => + maybeNonBlockingAttempt(hs) orElse maybeBlockingAttempt(hs) + } + }.catchAll(_ => ZIO.none) +} diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryRecordHandler.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryRecordHandler.scala index af0749c6..8ee8ab9f 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryRecordHandler.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryRecordHandler.scala @@ -33,7 +33,7 @@ object RetryRecordHandler { ): RecordHandler[R with R2 with GreyhoundMetrics, Nothing, K, V] = { val nonBlockingHandler = - NonBlockingRetryRecordHandler(handler, producer, retryConfig, subscription, nonBlockingRetryHelper, awaitShutdown) + NonBlockingRetryRecordHandler(handler, producer, retryConfig, subscription, nonBlockingRetryHelper, groupId, awaitShutdown) val blockingHandler = BlockingRetryRecordHandler(groupId, handler, retryConfig, blockingState, nonBlockingHandler, awaitShutdown) val blockingAndNonBlockingHandler = BlockingAndNonBlockingRetryRecordHandler(groupId, blockingHandler, nonBlockingHandler) @@ -55,15 +55,4 @@ object RetryRecordHandler { record.headers.get[String](key, StringSerde).catchAll(_ => ZIO.none) } -object ZIOHelper { - def foreachWhile[R, E, A](as: Iterable[A])(f: A => ZIO[R, E, LastHandleResult])(implicit trace: Trace): ZIO[R, E, LastHandleResult] = - ZIO.succeed(as.iterator).flatMap { i => - def loop: ZIO[R, E, LastHandleResult] = - if (i.hasNext) f(i.next).flatMap(result => if (result.shouldContinue) loop else ZIO.succeed(result)) - else ZIO.succeed(LastHandleResult(lastHandleSucceeded = false, shouldContinue = false)) - - loop - } -} - case class LastHandleResult(lastHandleSucceeded: Boolean, shouldContinue: Boolean) diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryAttemptTest.scala b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryAttemptTest.scala new file mode 100644 index 00000000..9e27c10b --- /dev/null +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryAttemptTest.scala @@ -0,0 +1,94 @@ +package com.wixpress.dst.greyhound.core.consumer.retry + +import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerSubscription +import com.wixpress.dst.greyhound.core.testkit.BaseTest +import zio.test.TestEnvironment + +import java.time.{Duration, Instant} +import scala.util.Random +import scala.concurrent.duration._ + +class RetryAttemptTest extends BaseTest[TestEnvironment] { + + "RetryAttempt.extract" should { + "deserialize attempt from headers for blocking retries" in { + val attempt = randomRetryAttempt + val headers = RetryAttempt.toHeaders(attempt) + val subscription = ConsumerSubscription.Topics(Set(attempt.originalTopic)) + for (result <- RetryAttempt.extract(headers, attempt.originalTopic, randomStr, subscription, None)) + yield result must beSome(attempt) + } + "deserialize attempt from headers and topic for non-blocking retries" in { + val attempt = randomRetryAttempt + // topic and attempt must be extracted from retryTopic + val headers = RetryAttempt.toHeaders(attempt.copy(originalTopic = "", attempt = -1)) + val subscription = ConsumerSubscription.Topics(Set(attempt.originalTopic)) + val group = randomStr + val retryTopic = NonBlockingRetryHelper.fixedRetryTopic(attempt.originalTopic, group, attempt.attempt) + for (result <- RetryAttempt.extract(headers, retryTopic, group, subscription, None)) + yield result must beSome(attempt) + } + "deserialize attempt for non-blocking retry after blocking retries" in { + val attempt = randomRetryAttempt + val headers = RetryAttempt.toHeaders(attempt) + val subscription = ConsumerSubscription.Topics(Set(attempt.originalTopic)) + val group = randomStr + val retries = RetryConfig.blockingFollowedByNonBlockingRetry( + blockingBackoffs = 1.milli :: 1.second :: Nil, + nonBlockingBackoffs = 5.minutes :: Nil, + ) + val retryTopic = NonBlockingRetryHelper.fixedRetryTopic(attempt.originalTopic, group, attempt.attempt) + for (result <- RetryAttempt.extract(headers, retryTopic, group, subscription, Some(retries))) + yield result must beSome(attempt.copy(attempt = attempt.attempt + 2)) // with 2 blocking retries before + } + "In case incorrect originalTopic header propagated from a different consumer group, ignore it," + + "do NOT consider it as if it's a non-blocking retry" in { + val attempt = propagatedRetryAttempt + val headers = RetryAttempt.toHeaders(attempt) + val currentTopic = "relevant-topic" + val subscription = ConsumerSubscription.Topics(Set(currentTopic)) + for (result <- RetryAttempt.extract(headers, currentTopic, randomStr, subscription, None)) + yield result must beNone + } + } + + "RetryAttempt.maxOverallAttempts" should { + "return 0 if no retries configured" in { + RetryAttempt.maxOverallAttempts(randomStr, None) must beSome(0) + } + "return max attempts for blocking retries" in { + val config = RetryConfig.finiteBlockingRetry(1.milli, 1.second) + RetryAttempt.maxOverallAttempts(randomStr, Some(config)) must beSome(2) + } + "return max attempts for non-blocking retries" in { + val config = RetryConfig.nonBlockingRetry(1.milli, 1.second, 5.minutes) + RetryAttempt.maxOverallAttempts(randomStr, Some(config)) must beSome(3) + } + "return max attempts for blocking retries followed by non-blocking" in { + val config = RetryConfig.blockingFollowedByNonBlockingRetry(1.milli :: 2.seconds :: Nil, 1.minute :: Nil) + RetryAttempt.maxOverallAttempts(randomStr, Some(config)) must beSome(3) + } + "return None for infinite blocking retries" in { + val config = RetryConfig.infiniteBlockingRetry(1.milli) + RetryAttempt.maxOverallAttempts(randomStr, Some(config)) must beNone + } + } + + override def env = testEnvironment + + private def randomStr = Random.alphanumeric.take(10).mkString + + private def randomRetryAttempt = RetryAttempt( + originalTopic = randomStr, + attempt = Random.nextInt(1000), + submittedAt = Instant.ofEpochMilli(math.abs(Random.nextLong())), + backoff = Duration.ofMillis(Random.nextInt(100000)) + ) + + private def propagatedRetryAttempt = RetryAttempt( + originalTopic = "some-other-topic", + attempt = Random.nextInt(1000), + submittedAt = Instant.ofEpochMilli(math.abs(Random.nextLong())), + backoff = Duration.ofMillis(Random.nextInt(100000)) + ) +} diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryConsumerRecordHandlerTest.scala b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryConsumerRecordHandlerTest.scala index c713fb12..0797ba25 100644 --- a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryConsumerRecordHandlerTest.scala +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryConsumerRecordHandlerTest.scala @@ -4,7 +4,7 @@ import java.time.Instant import com.wixpress.dst.greyhound.core.Serdes._ import com.wixpress.dst.greyhound.core._ import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerSubscription.Topics -import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, ConsumerSubscription, RecordHandler} +import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, RecordHandler} import com.wixpress.dst.greyhound.core.consumer.retry.BlockingState.{Blocked, Blocking => InternalBlocking, IgnoringAll, IgnoringOnce} import com.wixpress.dst.greyhound.core.consumer.retry.RetryConsumerRecordHandlerTest.{offset, partition, _} import com.wixpress.dst.greyhound.core.consumer.retry.RetryRecordHandlerMetric.{BlockingIgnoredForAllFor, BlockingIgnoredOnceFor, BlockingRetryHandlerInvocationFailed, NoRetryOnNonRetryableFailure} @@ -21,8 +21,6 @@ import zio.Random.{nextBytes, nextIntBounded} import zio.managed.UManaged import zio.test.TestClock -import scala.concurrent.TimeoutException - class RetryConsumerRecordHandlerTest extends BaseTest[TestClock with TestMetrics] { override def env: UManaged[ZEnvironment[TestClock with TestMetrics]] = @@ -52,9 +50,6 @@ class RetryConsumerRecordHandlerTest extends BaseTest[TestClock with TestMetrics _ <- retryHandler.handle(ConsumerRecord(topic, partition, offset, Headers.Empty, Some(key), value, 0L, 0L, 0L)) record <- producer.records.take now <- currentTime - retryAttempt <- IntSerde.serialize(retryTopic, 0) - submittedAt <- InstantSerde.serialize(retryTopic, now) - backoff <- DurationSerde.serialize(retryTopic, 1.second) } yield { record === ProducerRecord( @@ -62,7 +57,7 @@ class RetryConsumerRecordHandlerTest extends BaseTest[TestClock with TestMetrics value, Some(key), partition = None, - headers = Headers("retry-attempt" -> retryAttempt, "retry-submitted-at" -> submittedAt, "retry-backoff" -> backoff) + headers = RetryAttempt.toHeaders(RetryAttempt(topic, 0, now, 1.second)) ) } } @@ -71,7 +66,8 @@ class RetryConsumerRecordHandlerTest extends BaseTest[TestClock with TestMetrics for { producer <- FakeProducer.make topic <- randomTopicName - retryTopic = s"$topic-retry" + attempt = 0 + retryTopic = NonBlockingRetryHelper.fixedRetryTopic(topic, group, attempt) executionTime <- Promise.make[Nothing, Instant] handler = RecordHandler[Clock, HandlerError, Chunk[Byte], Chunk[Byte]] { _ => currentTime.flatMap(executionTime.succeed) } blockingState <- Ref.make[Map[BlockingTarget, BlockingState]](Map.empty) @@ -86,10 +82,7 @@ class RetryConsumerRecordHandlerTest extends BaseTest[TestClock with TestMetrics ) value <- bytes begin <- currentTime - retryAttempt <- IntSerde.serialize(retryTopic, 0) - submittedAt <- InstantSerde.serialize(retryTopic, begin) - backoff <- DurationSerde.serialize(retryTopic, 1.second) - headers = Headers("retry-attempt" -> retryAttempt, "retry-submitted-at" -> submittedAt, "retry-backoff" -> backoff) + headers = RetryAttempt.toHeaders(RetryAttempt(topic, attempt, begin, 1.second)) _ <- retryHandler.handle(ConsumerRecord(retryTopic, partition, offset, headers, None, value, 0L, 0L, 0L)).fork _ <- TestClock.adjust(1.second).repeat(Schedule.once) end <- executionTime.await.disconnect.timeoutFail(TimeoutWaitingForAssertion)(5.seconds) @@ -404,7 +397,8 @@ class RetryConsumerRecordHandlerTest extends BaseTest[TestClock with TestMetrics producer <- FakeProducer.make topic <- randomTopicName blockingState <- Ref.make[Map[BlockingTarget, BlockingState]](Map.empty) - retryHelper = alwaysBackOffRetryHelper(3.seconds) + retryHelper = FakeRetryHelper(topic) + now <- Clock.instant handling <- AwaitShutdown.makeManaged.flatMap { awaitShutdown => val retryHandler = RetryRecordHandler.withRetries( group, @@ -416,11 +410,12 @@ class RetryConsumerRecordHandlerTest extends BaseTest[TestClock with TestMetrics retryHelper, awaitShutdown = _ => ZIO.succeed(awaitShutdown) ) + val headers = RetryAttempt.toHeaders(RetryAttempt(topic, 0, now, 3.seconds)) for { key <- bytes value <- bytes handling <- retryHandler - .handle(ConsumerRecord(topic, partition, offset, Headers.Empty, Some(key), value, 0L, 0L, 0L)) + .handle(ConsumerRecord(topic, partition, offset, headers, Some(key), value, 0L, 0L, 0L)) .forkDaemon } yield handling } @@ -534,18 +529,6 @@ object RetryConsumerRecordHandlerTest { def randomTopicName = randomStr.map(suffix => s"some-topic-$suffix") val cause = new RuntimeException("cause") - - def alwaysBackOffRetryHelper(backoff: Duration) = { - new FakeNonBlockingRetryHelper { - override val topic: Topic = "" - - override def retryAttempt(topic: Topic, headers: Headers, subscription: ConsumerSubscription)( - implicit trace: Trace - ): UIO[Option[RetryAttempt]] = ZIO.succeed( - Some(RetryAttempt(topic, 1, Instant.now, backoff)) - ) - } - } } object TimeoutWaitingForAssertion extends RuntimeException diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/testkit/FakeRetryHelper.scala b/core/src/test/scala/com/wixpress/dst/greyhound/core/testkit/FakeRetryHelper.scala index cee13948..619ada36 100644 --- a/core/src/test/scala/com/wixpress/dst/greyhound/core/testkit/FakeRetryHelper.scala +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/testkit/FakeRetryHelper.scala @@ -1,35 +1,21 @@ package com.wixpress.dst.greyhound.core.testkit import java.time.Instant -import java.util.concurrent.TimeUnit.MILLISECONDS - -import com.wixpress.dst.greyhound.core.Serdes._ import com.wixpress.dst.greyhound.core._ import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, ConsumerSubscription} import com.wixpress.dst.greyhound.core.consumer.retry.RetryDecision.{NoMoreRetries, RetryWith} -import com.wixpress.dst.greyhound.core.consumer.retry.{BlockingHandlerFailed, NonBlockingRetryHelper, RetryAttempt, RetryDecision} +import com.wixpress.dst.greyhound.core.consumer.retry.{BlockingHandlerFailed, NonBlockingRetryHelper, RetryAttempt, RetryDecision, RetryHeader} import com.wixpress.dst.greyhound.core.producer.ProducerRecord import com.wixpress.dst.greyhound.core.testkit.FakeRetryHelper._ import zio._ import zio.Clock -import zio.Clock - trait FakeNonBlockingRetryHelper extends NonBlockingRetryHelper { val topic: Topic override def retryTopicsFor(originalTopic: Topic): Set[Topic] = Set(s"$originalTopic-retry") - override def retryAttempt(topic: Topic, headers: Headers, subscription: ConsumerSubscription)( - implicit trace: Trace - ): UIO[Option[RetryAttempt]] = - (for { - attempt <- headers.get(Header.Attempt, IntSerde) - submittedAt <- headers.get(Header.SubmittedAt, InstantSerde) - backoff <- headers.get(Header.Backoff, DurationSerde) - } yield retryAttemptInternal(topic, attempt, submittedAt, backoff)).orElse(ZIO.none) - override def retryDecision[E]( retryAttempt: Option[RetryAttempt], record: ConsumerRecord[Chunk[Byte], Chunk[Byte]], @@ -38,35 +24,23 @@ trait FakeNonBlockingRetryHelper extends NonBlockingRetryHelper { )(implicit trace: Trace): URIO[Any, RetryDecision] = error match { case RetriableError | BlockingHandlerFailed => - currentTime.flatMap(now => - recordFrom(now, retryAttempt, record) - .fold(_ => NoMoreRetries, RetryWith) + currentTime.map(now => + RetryWith(recordFrom(now, retryAttempt, record)) ) case NonRetriableError => ZIO.succeed(NoMoreRetries) } - private def retryAttemptInternal(topic: Topic, attempt: Option[Int], submittedAt: Option[Instant], backoff: Option[Duration]) = - for { - a <- attempt - s <- submittedAt - b <- backoff - } yield RetryAttempt(topic, a, s, b) - private def recordFrom(now: Instant, retryAttempt: Option[RetryAttempt], record: ConsumerRecord[Chunk[Byte], Chunk[Byte]])( implicit trace: Trace ) = { val nextRetryAttempt = retryAttempt.fold(0)(_.attempt + 1) - for { - retryAttempt <- IntSerde.serialize(topic, nextRetryAttempt) - submittedAt <- InstantSerde.serialize(topic, now) - backoff <- DurationSerde.serialize(topic, 1.second) - } yield ProducerRecord( + ProducerRecord( topic = s"$topic-retry", value = record.value, key = record.key, partition = None, - headers = Headers(Header.Attempt -> retryAttempt, Header.SubmittedAt -> submittedAt, Header.Backoff -> backoff) + headers = RetryAttempt.toHeaders(RetryAttempt(topic, nextRetryAttempt, now, 1.second)) ) } } @@ -75,13 +49,8 @@ case class FakeRetryHelper(topic: Topic) extends FakeNonBlockingRetryHelper object FakeRetryHelper { implicit private val trace = Trace.empty - object Header { - val Attempt = "retry-attempt" - val SubmittedAt = "retry-submitted-at" - val Backoff = "retry-backoff" - } - val currentTime = Clock.currentTime(MILLISECONDS).map(Instant.ofEpochMilli) + val currentTime: UIO[Instant] = Clock.instant } sealed trait HandlerError