Skip to content

Commit

Permalink
Ds scalafmt (#32485)
Browse files Browse the repository at this point in the history
* format: data streams/

* format: iptf/

GitOrigin-RevId: a31eb0dc0eaff014511665e52497dea67a3f9ff1
  • Loading branch information
leonbur authored and wix-oss committed Sep 23, 2023
1 parent 385bb84 commit 743c6a6
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.wixpress.dst.greyhound.core.producer.ProducerRecord
import com.wixpress.dst.greyhound.core.testkit.{BaseTestWithSharedEnv, TestMetrics}
import com.wixpress.dst.greyhound.core.zioutils.CountDownLatch
import com.wixpress.dst.greyhound.testenv.ITEnv
import com.wixpress.dst.greyhound.testenv.ITEnv.{Env, TestResources, testResources}
import com.wixpress.dst.greyhound.testenv.ITEnv.{testResources, Env, TestResources}
import org.apache.kafka.common.config.TopicConfig.{DELETE_RETENTION_MS_CONFIG, MAX_MESSAGE_BYTES_CONFIG, RETENTION_MS_CONFIG}
import org.apache.kafka.common.errors.InvalidTopicException
import org.specs2.specification.core.Fragments
Expand Down Expand Up @@ -83,7 +83,7 @@ class AdminClientIT extends BaseTestWithSharedEnv[Env, TestResources] {
}
}

//todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged
// todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged
// "reflect errors" in {
// val topic1 = aTopicConfig()
// val topic2 = aTopicConfig("x" * 250)
Expand All @@ -104,7 +104,7 @@ class AdminClientIT extends BaseTestWithSharedEnv[Env, TestResources] {
// created === Map(badTopic.name -> None)
// }
// }
//todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged
// todo uncomment this after https://github.com/wix-private/core-server-build-tools/pull/13043 is merged
// =================================================================================================================================
"ignore TopicExistsException by default" in {
val topic = aTopicConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,10 @@ object UnsafeOffsetOperations {
}

override def offsetsForTimes(partitions: Set[TopicPartition], timeEpoch: Long, timeout: Duration): Map[TopicPartition, Option[Long]] =
consumer.offsetsForTimes(partitions.map(_.asKafka).map(tp => (tp, new lang.Long(timeEpoch))).toMap.asJava, timeout)
.asScala.toMap.map { case (tp, of) => TopicPartition(tp) -> (Option(of).map(_.offset())) }
consumer
.offsetsForTimes(partitions.map(_.asKafka).map(tp => (tp, new lang.Long(timeEpoch))).toMap.asJava, timeout)
.asScala
.toMap
.map { case (tp, of) => TopicPartition(tp) -> (Option(of).map(_.offset())) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ object EventLoop {
partitionsAssigned <- Promise.make[Nothing, Unit]
// TODO how to handle errors in subscribe?
rebalanceListener = listener(pausedPartitionsRef, config, dispatcher, partitionsAssigned, group, consumer, clientId, offsets)
_ <- report(SubscribingToInitialSubAndRebalanceListener(clientId, group, consumerAttributes))
_ <- report(SubscribingToInitialSubAndRebalanceListener(clientId, group, consumerAttributes))
_ <- subscribe(initialSubscription, rebalanceListener)(consumer)
running <- Ref.make[EventLoopState](Running)
_ <- report(CreatingPollOnceFiber(clientId, group, consumerAttributes))
_ <- report(CreatingPollOnceFiber(clientId, group, consumerAttributes))
fiber <- pollOnce(running, consumer, dispatcher, pausedPartitionsRef, positionsRef, offsets, config, clientId, group)
.repeatWhile(_ == true)
.forkDaemon
_ <- report(AwaitingPartitionsAssignment(clientId, group, consumerAttributes))
_ <- report(AwaitingPartitionsAssignment(clientId, group, consumerAttributes))
_ <- partitionsAssigned.await
env <- ZIO.environment[Env]
} yield (dispatcher, fiber, offsets, positionsRef, running, rebalanceListener.provideEnvironment(env))
Expand Down Expand Up @@ -303,9 +303,11 @@ object EventLoopMetric {

case class FailedToUpdatePositions(t: Throwable, clientId: ClientId, attributes: Map[String, String] = Map.empty) extends EventLoopMetric

case class CreatingDispatcher(clientId: ClientId, group: Group, attributes: Map[String, String], startPaused: Boolean) extends EventLoopMetric
case class CreatingDispatcher(clientId: ClientId, group: Group, attributes: Map[String, String], startPaused: Boolean)
extends EventLoopMetric

case class SubscribingToInitialSubAndRebalanceListener(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric
case class SubscribingToInitialSubAndRebalanceListener(clientId: ClientId, group: Group, attributes: Map[String, String])
extends EventLoopMetric

case class CreatingPollOnceFiber(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ class OffsetsInitializer(
offsetOperations.pause(toPause)
val rewindUncommittedOffsets =
if (offsetResetIsEarliest || notCommitted.isEmpty || rewindUncommittedOffsetsBy.isZero) Map.empty
else offsetOperations.offsetsForTimes(notCommitted, clock.millis() - rewindUncommittedOffsetsBy.toMillis, effectiveTimeout)
.map{case (tp, maybeRewindedOffset) => (tp, maybeRewindedOffset.orElse(endOffsets.get(tp)).getOrElse(0L))}
else
offsetOperations
.offsetsForTimes(notCommitted, clock.millis() - rewindUncommittedOffsetsBy.toMillis, effectiveTimeout)
.map { case (tp, maybeRewindedOffset) => (tp, maybeRewindedOffset.orElse(endOffsets.get(tp)).getOrElse(0L)) }

val positions =
notCommitted.map(tp => tp -> offsetOperations.position(tp, effectiveTimeout)).toMap ++ toOffsets ++ rewindUncommittedOffsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ case class ReportingConsumer(clientId: ClientId, group: Group, internal: Consume
implicit trace: Trace
): UIO[DelayedRebalanceEffect] =
(report(PartitionsRevoked(clientId, group, partitions, config.consumerAttributes)) *>
rebalanceListener.onPartitionsRevoked(consumer, partitions)
.timed.tap { case (duration, _) => report(PartitionsRevokedComplete(clientId, group, partitions, config.consumerAttributes, duration.toMillis)) }
.map(_._2)
).provideEnvironment(r)
rebalanceListener
.onPartitionsRevoked(consumer, partitions)
.timed
.tap {
case (duration, _) =>
report(PartitionsRevokedComplete(clientId, group, partitions, config.consumerAttributes, duration.toMillis))
}
.map(_._2)).provideEnvironment(r)

override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[Any] =
(report(PartitionsAssigned(clientId, group, partitions, config.consumerAttributes)) *>
Expand Down Expand Up @@ -241,11 +245,13 @@ object ConsumerMetric {
attributes: Map[String, String] = Map.empty
) extends ConsumerMetric

case class PartitionsRevokedComplete(clientId: ClientId,
group: Group,
partitions: Set[TopicPartition],
attributes: Map[String, String] = Map.empty,
durationMs: Long) extends ConsumerMetric
case class PartitionsRevokedComplete(
clientId: ClientId,
group: Group,
partitions: Set[TopicPartition],
attributes: Map[String, String] = Map.empty,
durationMs: Long
) extends ConsumerMetric

case class SubscribeFailed(clientId: ClientId, group: Group, error: Throwable, attributes: Map[String, String] = Map.empty)
extends ConsumerMetric
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package com.wixpress.dst.greyhound.core.consumer.retry

import java.time.{Instant, Duration => JavaDuration}
import java.time.{Duration => JavaDuration, Instant}
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.regex.Pattern
import com.wixpress.dst.greyhound.core.Serdes.StringSerde
import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerSubscription.{TopicPattern, Topics}
import com.wixpress.dst.greyhound.core.consumer.retry.RetryAttempt.{RetryAttemptNumber, currentTime}
import com.wixpress.dst.greyhound.core.consumer.retry.RetryAttempt.{currentTime, RetryAttemptNumber}
import com.wixpress.dst.greyhound.core.consumer.retry.RetryDecision.{NoMoreRetries, RetryWith}
import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, ConsumerSubscription}
import com.wixpress.dst.greyhound.core.consumer.retry.RetryRecordHandlerMetric.WaitingForRetry
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics.report
import com.wixpress.dst.greyhound.core.producer.ProducerRecord
import com.wixpress.dst.greyhound.core.{Group, Headers, Topic, durationDeserializer, instantDeserializer}
import com.wixpress.dst.greyhound.core.{durationDeserializer, instantDeserializer, Group, Headers, Topic}
import zio.Clock
import zio.Duration
import zio.Schedule.spaced
Expand All @@ -24,14 +24,14 @@ trait NonBlockingRetryHelper {
def retryTopicsFor(originalTopic: Topic): Set[Topic]

def retryAttempt(topic: Topic, headers: Headers, subscription: ConsumerSubscription)(
implicit trace: Trace
implicit trace: Trace
): UIO[Option[RetryAttempt]]

def retryDecision[E](
retryAttempt: Option[RetryAttempt],
record: ConsumerRecord[Chunk[Byte], Chunk[Byte]],
error: E,
subscription: ConsumerSubscription
retryAttempt: Option[RetryAttempt],
record: ConsumerRecord[Chunk[Byte], Chunk[Byte]],
error: E,
subscription: ConsumerSubscription
)(implicit trace: Trace): URIO[Any, RetryDecision]

def retrySteps = retryTopicsFor("").size
Expand All @@ -47,46 +47,44 @@ object NonBlockingRetryHelper {
.getOrElse(NonBlockingBackoffPolicy.empty)

override def retryTopicsFor(topic: Topic): Set[Topic] =
policy(topic).intervals.indices.foldLeft(Set.empty[String])((acc, attempt) =>
acc + s"$topic-$group-retry-$attempt"
)
policy(topic).intervals.indices.foldLeft(Set.empty[String])((acc, attempt) => acc + s"$topic-$group-retry-$attempt")

override def retryAttempt(topic: Topic, headers: Headers, subscription: ConsumerSubscription)(
implicit trace: Trace
implicit trace: Trace
): UIO[Option[RetryAttempt]] = {
(for {
submitted <- headers.get(RetryHeader.Submitted, instantDeserializer)
backoff <- headers.get(RetryHeader.Backoff, durationDeserializer)
originalTopic <- headers.get[String](RetryHeader.OriginalTopic, StringSerde)
} yield for {
ta <- topicAttempt(subscription, topic, originalTopic)
ta <- topicAttempt(subscription, topic, originalTopic)
TopicAttempt(originalTopic, attempt) = ta
s <- submitted
b <- backoff
s <- submitted
b <- backoff
} yield RetryAttempt(originalTopic, attempt, s, b))
.catchAll(_ => ZIO.none)
}

private def topicAttempt(
subscription: ConsumerSubscription,
topic: Topic,
originalTopicHeader: Option[String]
subscription: ConsumerSubscription,
topic: Topic,
originalTopicHeader: Option[String]
) =
subscription match {
case _: Topics => extractTopicAttempt(group, topic)
case _: Topics => extractTopicAttempt(group, topic)
case _: TopicPattern =>
extractTopicAttemptFromPatternRetryTopic(group, topic, originalTopicHeader)
}

override def retryDecision[E](
retryAttempt: Option[RetryAttempt],
record: ConsumerRecord[Chunk[Byte], Chunk[Byte]],
error: E,
subscription: ConsumerSubscription
retryAttempt: Option[RetryAttempt],
record: ConsumerRecord[Chunk[Byte], Chunk[Byte]],
error: E,
subscription: ConsumerSubscription
)(implicit trace: Trace): URIO[Any, RetryDecision] = currentTime.map(now => {
val nextRetryAttempt = retryAttempt.fold(0)(_.attempt + 1)
val originalTopic = retryAttempt.fold(record.topic)(_.originalTopic)
val retryTopic = subscription match {
val retryTopic = subscription match {
case _: TopicPattern => patternRetryTopic(group, nextRetryAttempt)
case _: Topics => fixedRetryTopic(originalTopic, group, nextRetryAttempt)
}
Expand Down Expand Up @@ -123,19 +121,19 @@ object NonBlockingRetryHelper {
inputTopic.split(s"-$group-retry-").toSeq match {
case Seq(topic, attempt) if Try(attempt.toInt).isSuccess =>
Some(TopicAttempt(topic, attempt.toInt))
case _ => None
case _ => None
}

private def extractTopicAttemptFromPatternRetryTopic[E](
group: Group,
inputTopic: Topic,
originalTopicHeader: Option[String]
group: Group,
inputTopic: Topic,
originalTopicHeader: Option[String]
) = {
originalTopicHeader.flatMap(originalTopic => {
inputTopic.split(s"__gh_pattern-retry-$group-attempt-").toSeq match {
case Seq(_, attempt) if Try(attempt.toInt).isSuccess =>
Some(TopicAttempt(originalTopic, attempt.toInt))
case _ => None
case _ => None
}
})
}
Expand Down Expand Up @@ -176,14 +174,14 @@ object RetryHeader {
}

case class RetryAttempt(
originalTopic: Topic,
attempt: RetryAttemptNumber,
submittedAt: Instant,
backoff: Duration
originalTopic: Topic,
attempt: RetryAttemptNumber,
submittedAt: Instant,
backoff: Duration
) {

def sleep(implicit trace: Trace): URIO[GreyhoundMetrics, Unit] =
(RetryUtil.sleep(submittedAt, backoff) race reportWaitingInIntervals(every = 60.seconds))
RetryUtil.sleep(submittedAt, backoff) race reportWaitingInIntervals(every = 60.seconds)

private def reportWaitingInIntervals(every: Duration) =
report(WaitingForRetry(originalTopic, attempt, submittedAt.toEpochMilli, backoff.toMillis))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ private[retry] object NonBlockingRetryRecordHandler {
}

private def delayRetry(record: ConsumerRecord[_, _], awaitShutdown: TopicPartition => UIO[AwaitShutdown])(
retryAttempt: RetryAttempt) =
retryAttempt: RetryAttempt
) =
zio.Random.nextInt.flatMap(correlationId =>
report(
WaitingBeforeRetry(record.topic, retryAttempt, record.partition, record.offset, correlationId)
) *>
awaitShutdown(record.topicPartition)
.flatMap(_.interruptOnShutdown(retryAttempt.sleep))
.reporting(r => DoneWaitingBeforeRetry(record.topic, record.partition, record.offset, retryAttempt, r.duration, r.failed, correlationId))
.reporting(r =>
DoneWaitingBeforeRetry(record.topic, record.partition, record.offset, retryAttempt, r.duration, r.failed, correlationId)
)
)

override def isHandlingRetryTopicMessage(group: Group, record: ConsumerRecord[K, V]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ object RetryRecordHandlerMetric {
case class NoRetryOnNonRetryableFailure(partition: TopicPartition, offset: Long, cause: Exception) extends RetryRecordHandlerMetric
case object Silent extends RetryRecordHandlerMetric

case class WaitingBeforeRetry(retryTopic: Topic, retryAttempt: RetryAttempt, partition: Int, offset:Long, correlationId: Int) extends RetryRecordHandlerMetric
case class WaitingBeforeRetry(retryTopic: Topic, retryAttempt: RetryAttempt, partition: Int, offset: Long, correlationId: Int)
extends RetryRecordHandlerMetric

case class DoneWaitingBeforeRetry(
retryTopic: Topic,
Expand Down
Loading

0 comments on commit 743c6a6

Please sign in to comment.