Skip to content

Commit

Permalink
[greyhound] remove internal topic creation - wix adapter (#33820)
Browse files Browse the repository at this point in the history
not create retry topics inside RecordConsumer builder, from wix adapter (it's already created). Removing last reference to an actual AdminClient in wix-adapter when asking to use proxy.

GitOrigin-RevId: fb1416141b6b05559b1b606576cc03556b6a78f4
  • Loading branch information
berman7 authored and wix-oss committed Sep 23, 2023
1 parent 73b59ce commit 5d4e16c
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ object RecordConsumer {
(initialSubscription, topicsToCreate) = config.retryConfig.fold((config.initialSubscription, Set.empty[Topic]))(policy =>
maybeAddRetryTopics(policy, config, nonBlockingRetryHelper)
)
_ <- AdminClient
_ <- ZIO.when(config.createRetryTopics)(AdminClient
.make(AdminClientConfig(config.bootstrapServers, config.kafkaAuthProperties), config.consumerAttributes)
.tap(client =>
client.createTopics(
topicsToCreate.map(topic =>
TopicConfig(topic, partitions = 1, replicationFactor = 1, cleanupPolicy = CleanupPolicy.Delete(86400000L))
)
)
)
))
blockingState <- Ref.make[Map[BlockingTarget, BlockingState]](Map.empty)
blockingStateResolver = BlockingStateResolver(blockingState)
workersShutdownRef <- Ref.make[Map[TopicPartition, ShutdownPromise]](Map.empty)
Expand Down Expand Up @@ -321,7 +321,8 @@ case class RecordConsumerConfig(
decryptor: Decryptor[Any, Throwable, Chunk[Byte], Chunk[Byte]] = new NoOpDecryptor,
retryProducerAttributes: Map[String, String] = Map.empty,
commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA,
rewindUncommittedOffsetsBy: Duration = 0.millis
rewindUncommittedOffsetsBy: Duration = 0.millis,
createRetryTopics: Boolean = true
) extends CommonGreyhoundConfig {

override def kafkaProps: Map[String, String] = extraProperties
Expand Down

0 comments on commit 5d4e16c

Please sign in to comment.