Skip to content

Commit

Permalink
Blocking retries attempts tracking fix + fix transitive header bug (#…
Browse files Browse the repository at this point in the history
…33827)

* Revert "Revert "Blocking retries attempts tracking fix" (#33818)"

This reverts commit eb859fcf70860c06a0bd0492b746ed3ff00bc8ac.

* fix bug on retryAttempt resolution - ignore propagated headers

* CR change

* fix test. change retry name to correctly structured one

GitOrigin-RevId: 53fb7cf3efa315f496a00ade574397e30f20eeaf
  • Loading branch information
natansil authored and wix-oss committed Sep 11, 2023
1 parent 271302e commit b90bbfa
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package com.wixpress.dst.greyhound.core.consumer.retry
import java.util.concurrent.TimeUnit
import com.wixpress.dst.greyhound.core.{Group, TopicPartition}
import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, RecordHandler}
import com.wixpress.dst.greyhound.core.consumer.retry.BlockingState.{Blocked, Blocking => InternalBlocking, IgnoringOnce}
import com.wixpress.dst.greyhound.core.consumer.retry.BlockingState.{Blocked, IgnoringOnce, Blocking => InternalBlocking}
import com.wixpress.dst.greyhound.core.consumer.retry.RetryRecordHandlerMetric.{BlockingRetryHandlerInvocationFailed, DoneBlockingBeforeRetry, NoRetryOnNonRetryableFailure}
import com.wixpress.dst.greyhound.core.consumer.retry.ZIOHelper.foreachWhile
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics.report
import com.wixpress.dst.greyhound.core.zioutils.AwaitShutdown
Expand All @@ -31,7 +30,7 @@ private[retry] object BlockingRetryRecordHandler {
override def handle(record: ConsumerRecord[K, V])(implicit trace: Trace): ZIO[GreyhoundMetrics with R, Nothing, LastHandleResult] = {
val topicPartition = TopicPartition(record.topic, record.partition)

def pollBlockingStateWithSuspensions(interval: Duration, start: Long): URIO[GreyhoundMetrics, PollResult] = {
def pollBlockingStateWithSuspensions(record: ConsumerRecord[K, V], interval: Duration, start: Long): URIO[GreyhoundMetrics, PollResult] = {
for {
shouldBlock <- blockingStateResolver.resolve(record)
shouldPollAgain <-
Expand All @@ -43,14 +42,14 @@ private[retry] object BlockingRetryRecordHandler {
} yield shouldPollAgain
}

def blockOnErrorFor(interval: Duration) = {
def blockOnErrorFor(record: ConsumerRecord[K, V], interval: Duration) = {
for {
start <- currentTime(TimeUnit.MILLISECONDS)
continueBlocking <-
if (interval.toMillis > 100L) {
awaitShutdown(record.topicPartition).flatMap(
_.interruptOnShutdown(
pollBlockingStateWithSuspensions(interval, start).repeatWhile(result => result.pollAgain).map(_.blockHandling)
pollBlockingStateWithSuspensions(record, interval, start).repeatWhile(result => result.pollAgain).map(_.blockHandling)
).reporting(r => DoneBlockingBeforeRetry(record.topic, record.partition, record.offset, r.duration, r.failed))
)
} else {
Expand All @@ -63,6 +62,7 @@ private[retry] object BlockingRetryRecordHandler {
}

def handleAndMaybeBlockOnErrorFor(
record: ConsumerRecord[K, V],
interval: Option[Duration]
): ZIO[R with GreyhoundMetrics, Nothing, LastHandleResult] = {
handler.handle(record).map(_ => LastHandleResult(lastHandleSucceeded = true, shouldContinue = false)).catchAll {
Expand All @@ -73,7 +73,7 @@ private[retry] object BlockingRetryRecordHandler {
case error =>
interval
.map { interval =>
report(BlockingRetryHandlerInvocationFailed(topicPartition, record.offset, error.toString)) *> blockOnErrorFor(interval)
report(BlockingRetryHandlerInvocationFailed(topicPartition, record.offset, error.toString)) *> blockOnErrorFor(record, interval)
}
.getOrElse(ZIO.succeed(LastHandleResult(lastHandleSucceeded = false, shouldContinue = false)))
}
Expand All @@ -96,13 +96,44 @@ private[retry] object BlockingRetryRecordHandler {
} else {
val durationsIncludingForInvocationWithNoErrorHandling = retryConfig.blockingBackoffs(record.topic)().map(Some(_)) :+ None
for {
result <- foreachWhile(durationsIncludingForInvocationWithNoErrorHandling) { interval => handleAndMaybeBlockOnErrorFor(interval) }
_ <- maybeBackToStateBlocking
result <- retryEvery(record, durationsIncludingForInvocationWithNoErrorHandling) { (rec, interval) =>
handleAndMaybeBlockOnErrorFor(rec, interval)
}
_ <- maybeBackToStateBlocking
} yield result
}
}
}

private def retryEvery[K, V, R, E](record: ConsumerRecord[K, V], as: Iterable[Option[Duration]])(
f: (ConsumerRecord[K, V], Option[Duration]) => ZIO[R, E, LastHandleResult]
)(implicit trace: Trace): ZIO[R, E, LastHandleResult] = {
ZIO.succeed(as.iterator).flatMap { i =>
def loop(retryAttempt: Option[RetryAttempt]): ZIO[R, E, LastHandleResult] =
if (i.hasNext) {
val nextDelay = i.next
val recordWithAttempt = retryAttempt.fold(record) { attempt =>
record.copy(headers = record.headers ++ RetryAttempt.toHeaders(attempt))
}
f(recordWithAttempt, nextDelay).flatMap { result =>
if (result.shouldContinue) Clock.instant.flatMap { now =>
val nextAttempt = RetryAttempt(
originalTopic = record.topic,
attempt = retryAttempt.fold(0)(_.attempt + 1),
submittedAt = now,
backoff = nextDelay getOrElse Duration.Zero
)
loop(Some(nextAttempt))
}
else ZIO.succeed(result)
}
}
else ZIO.succeed(LastHandleResult(lastHandleSucceeded = false, shouldContinue = false))

loop(None)
}
}

private def handleNonRetriable[K, V, E, R](record: ConsumerRecord[K, V], topicPartition: TopicPartition, cause: Exception) =
report(NoRetryOnNonRetryableFailure(topicPartition, record.offset, cause))
.as(LastHandleResult(lastHandleSucceeded = false, shouldContinue = false))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,40 +1,31 @@
package com.wixpress.dst.greyhound.core.consumer.retry

import java.time.{Duration => JavaDuration, Instant}
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.regex.Pattern
import com.wixpress.dst.greyhound.core.Serdes.StringSerde
import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerSubscription.{TopicPattern, Topics}
import com.wixpress.dst.greyhound.core.consumer.retry.RetryAttempt.{currentTime, RetryAttemptNumber}
import com.wixpress.dst.greyhound.core.consumer.retry.RetryDecision.{NoMoreRetries, RetryWith}
import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, ConsumerSubscription}
import com.wixpress.dst.greyhound.core.consumer.retry.RetryDecision.{NoMoreRetries, RetryWith}
import com.wixpress.dst.greyhound.core.consumer.retry.RetryRecordHandlerMetric.WaitingForRetry
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics.report
import com.wixpress.dst.greyhound.core.producer.ProducerRecord
import com.wixpress.dst.greyhound.core.{durationDeserializer, instantDeserializer, Group, Headers, Topic}
import zio.Clock
import zio.Duration
import com.wixpress.dst.greyhound.core.{Group, Topic}
import zio.Schedule.spaced
import zio.{Chunk, UIO, URIO, _}
import zio.{Chunk, Clock, Duration, URIO, _}

import java.time.Instant
import java.util.regex.Pattern
import scala.util.Try

trait NonBlockingRetryHelper {
def retryTopicsFor(originalTopic: Topic): Set[Topic]

def retryAttempt(topic: Topic, headers: Headers, subscription: ConsumerSubscription)(
implicit trace: Trace
): UIO[Option[RetryAttempt]]

def retryDecision[E](
retryAttempt: Option[RetryAttempt],
record: ConsumerRecord[Chunk[Byte], Chunk[Byte]],
error: E,
subscription: ConsumerSubscription
)(implicit trace: Trace): URIO[Any, RetryDecision]

def retrySteps = retryTopicsFor("").size
def retrySteps: Int = retryTopicsFor("").size
}

object NonBlockingRetryHelper {
Expand All @@ -49,82 +40,70 @@ object NonBlockingRetryHelper {
override def retryTopicsFor(topic: Topic): Set[Topic] =
policy(topic).intervals.indices.foldLeft(Set.empty[String])((acc, attempt) => acc + s"$topic-$group-retry-$attempt")

override def retryAttempt(topic: Topic, headers: Headers, subscription: ConsumerSubscription)(
implicit trace: Trace
): UIO[Option[RetryAttempt]] = {
(for {
submitted <- headers.get(RetryHeader.Submitted, instantDeserializer)
backoff <- headers.get(RetryHeader.Backoff, durationDeserializer)
originalTopic <- headers.get[String](RetryHeader.OriginalTopic, StringSerde)
} yield for {
ta <- topicAttempt(subscription, topic, originalTopic)
TopicAttempt(originalTopic, attempt) = ta
s <- submitted
b <- backoff
} yield RetryAttempt(originalTopic, attempt, s, b))
.catchAll(_ => ZIO.none)
}

private def topicAttempt(
subscription: ConsumerSubscription,
topic: Topic,
originalTopicHeader: Option[String]
) =
subscription match {
case _: Topics => extractTopicAttempt(group, topic)
case _: TopicPattern =>
extractTopicAttemptFromPatternRetryTopic(group, topic, originalTopicHeader)
}

override def retryDecision[E](
retryAttempt: Option[RetryAttempt],
record: ConsumerRecord[Chunk[Byte], Chunk[Byte]],
error: E,
subscription: ConsumerSubscription
)(implicit trace: Trace): URIO[Any, RetryDecision] = currentTime.map(now => {
val nextRetryAttempt = retryAttempt.fold(0)(_.attempt + 1)
)(implicit trace: Trace): URIO[Any, RetryDecision] = Clock.instant.map(now => {
val blockingRetriesBefore = RetryAttempt.maxBlockingAttempts(
NonBlockingRetryHelper.originalTopic(record.topic, group),
retryConfig
).getOrElse(0)

// attempt if present contains full number of retries
val nextNonBlockingAttempt = retryAttempt.fold(0)(_.attempt + 1 - blockingRetriesBefore)
val nextRetryAttempt = nextNonBlockingAttempt + blockingRetriesBefore
val originalTopic = retryAttempt.fold(record.topic)(_.originalTopic)
val retryTopic = subscription match {
case _: TopicPattern => patternRetryTopic(group, nextRetryAttempt)
case _: Topics => fixedRetryTopic(originalTopic, group, nextRetryAttempt)
case _: TopicPattern => patternRetryTopic(group, nextNonBlockingAttempt)
case _: Topics => fixedRetryTopic(originalTopic, group, nextNonBlockingAttempt)
}

val topicRetryPolicy = policy(record.topic)
topicRetryPolicy.intervals
.lift(nextRetryAttempt)
.lift(nextNonBlockingAttempt)
.map { backoff =>
val attempt = RetryAttempt(
attempt = nextRetryAttempt,
originalTopic = originalTopic,
submittedAt = now,
backoff = backoff
)
topicRetryPolicy.recordMutate(
ProducerRecord(
topic = retryTopic,
value = record.value,
key = record.key,
partition = None,
headers = record.headers +
(RetryHeader.Submitted -> toChunk(now.toEpochMilli)) +
(RetryHeader.Backoff -> toChunk(backoff.toMillis)) +
(RetryHeader.OriginalTopic -> toChunk(originalTopic)) +
(RetryHeader.RetryAttempt -> toChunk(nextRetryAttempt))
headers = record.headers ++ RetryAttempt.toHeaders(attempt)
)
)
}
.fold[RetryDecision](NoMoreRetries)(RetryWith)
})
}

private def toChunk(long: Long): Chunk[Byte] =
Chunk.fromArray(long.toString.getBytes)

private def toChunk(str: String): Chunk[Byte] =
Chunk.fromArray(str.getBytes)
private[retry] def attemptNumberFromTopic(
subscription: ConsumerSubscription,
topic: Topic,
originalTopicHeader: Option[String],
group: Group
) =
subscription match {
case _: Topics => extractTopicAttempt(group, topic)
case _: TopicPattern =>
extractTopicAttemptFromPatternRetryTopic(group, topic, originalTopicHeader)
}

private def extractTopicAttempt[E](group: Group, inputTopic: Topic) =
private def extractTopicAttempt(group: Group, inputTopic: Topic) =
inputTopic.split(s"-$group-retry-").toSeq match {
case Seq(topic, attempt) if Try(attempt.toInt).isSuccess =>
Some(TopicAttempt(topic, attempt.toInt))
case _ => None
case _ => None
}

private def extractTopicAttemptFromPatternRetryTopic[E](
private def extractTopicAttemptFromPatternRetryTopic(
group: Group,
inputTopic: Topic,
originalTopicHeader: Option[String]
Expand Down Expand Up @@ -166,49 +145,27 @@ object DelayHeaders {
val Backoff = "backOffTimeMs"
}

object RetryHeader {
val Submitted = "submitTimestamp"
val Backoff = DelayHeaders.Backoff
val OriginalTopic = "GH_OriginalTopic"
val RetryAttempt = "GH_RetryAttempt"
}

case class RetryAttempt(
originalTopic: Topic,
attempt: RetryAttemptNumber,
submittedAt: Instant,
backoff: Duration
) {

def sleep(implicit trace: Trace): URIO[GreyhoundMetrics, Unit] =
RetryUtil.sleep(submittedAt, backoff) race reportWaitingInIntervals(every = 60.seconds)

private def reportWaitingInIntervals(every: Duration) =
report(WaitingForRetry(originalTopic, attempt, submittedAt.toEpochMilli, backoff.toMillis))
.repeat(spaced(every))
.unit
}

object RetryUtil {
def sleep(attempt: RetryAttempt)(implicit trace: Trace): URIO[GreyhoundMetrics, Unit] =
sleep(attempt.submittedAt, attempt.backoff) race
report(WaitingForRetry(attempt.originalTopic, attempt.attempt, attempt.submittedAt.toEpochMilli, attempt.backoff.toMillis))
.repeat(spaced(60.seconds))
.unit

def sleep(submittedAt: Instant, backoff: Duration)(implicit trace: Trace): URIO[Any, Unit] = {
val expiresAt = submittedAt.plus(backoff.asJava)
currentTime
Clock.instant
.map(_.isAfter(expiresAt))
.flatMap(expired =>
if (expired) ZIO.unit
else
ZIO.sleep(1.seconds).repeatUntilZIO(_ => currentTime.map(_.isAfter(expiresAt))).unit
ZIO.sleep(1.second).repeatUntilZIO(_ => Clock.instant.map(_.isAfter(expiresAt))).unit
)
}
}

private case class TopicAttempt(originalTopic: Topic, attempt: Int)

object RetryAttempt {
type RetryAttemptNumber = Int
val currentTime = Clock.currentTime(MILLISECONDS).map(Instant.ofEpochMilli)
}

sealed trait RetryDecision

object RetryDecision {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ private[retry] object NonBlockingRetryRecordHandler {
retryConfig: RetryConfig,
subscription: ConsumerSubscription,
nonBlockingRetryHelper: NonBlockingRetryHelper,
groupId: Group,
awaitShutdown: TopicPartition => UIO[AwaitShutdown]
)(implicit evK: K <:< Chunk[Byte], evV: V <:< Chunk[Byte]): NonBlockingRetryRecordHandler[V, K, R] =
new NonBlockingRetryRecordHandler[V, K, R] {
override def handle(record: ConsumerRecord[K, V]): ZIO[GreyhoundMetrics with R, Nothing, Any] = {
nonBlockingRetryHelper.retryAttempt(record.topic, record.headers, subscription).flatMap { retryAttempt =>
RetryAttempt.extract(record.headers, record.topic, groupId, subscription, Some(retryConfig)).flatMap { retryAttempt =>
maybeDelayRetry(record, retryAttempt) *>
handler.handle(record).catchAll {
case Right(_: NonRetriableException) => ZIO.unit
Expand All @@ -56,7 +57,7 @@ private[retry] object NonBlockingRetryRecordHandler {
WaitingBeforeRetry(record.topic, retryAttempt, record.partition, record.offset, correlationId)
) *>
awaitShutdown(record.topicPartition)
.flatMap(_.interruptOnShutdown(retryAttempt.sleep))
.flatMap(_.interruptOnShutdown(RetryUtil.sleep(retryAttempt)))
.reporting(r =>
DoneWaitingBeforeRetry(record.topic, record.partition, record.offset, retryAttempt, r.duration, r.failed, correlationId)
)
Expand All @@ -74,7 +75,7 @@ private[retry] object NonBlockingRetryRecordHandler {
override def handleAfterBlockingFailed(
record: ConsumerRecord[K, V]
): ZIO[GreyhoundMetrics with R, Nothing, Any] = {
nonBlockingRetryHelper.retryAttempt(record.topic, record.headers, subscription).flatMap { retryAttempt =>
RetryAttempt.extract(record.headers, record.topic, groupId, subscription, Some(retryConfig)).flatMap { retryAttempt =>
maybeRetry(retryAttempt, BlockingHandlerFailed, record)
}
}
Expand Down
Loading

0 comments on commit b90bbfa

Please sign in to comment.