From 59ed17c9eb601735c979ecca8b711f327fb120e7 Mon Sep 17 00:00:00 2001 From: Noam Berman Date: Mon, 3 Apr 2023 17:26:15 +0300 Subject: [PATCH] [greyhound] remove internal topic creation - wix adapter (#33820) not create retry topics inside RecordConsumer builder, from wix adapter (it's already created). Removing last reference to an actual AdminClient in wix-adapter when asking to use proxy. GitOrigin-RevId: fb1416141b6b05559b1b606576cc03556b6a78f4 --- .../dst/greyhound/core/consumer/RecordConsumer.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RecordConsumer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RecordConsumer.scala index 883f17a8..1b426d60 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RecordConsumer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RecordConsumer.scala @@ -79,7 +79,7 @@ object RecordConsumer { (initialSubscription, topicsToCreate) = config.retryConfig.fold((config.initialSubscription, Set.empty[Topic]))(policy => maybeAddRetryTopics(policy, config, nonBlockingRetryHelper) ) - _ <- AdminClient + _ <- ZIO.when(config.createRetryTopics)(AdminClient .make(AdminClientConfig(config.bootstrapServers, config.kafkaAuthProperties), config.consumerAttributes) .tap(client => client.createTopics( @@ -87,7 +87,7 @@ object RecordConsumer { TopicConfig(topic, partitions = 1, replicationFactor = 1, cleanupPolicy = CleanupPolicy.Delete(86400000L)) ) ) - ) + )) blockingState <- Ref.make[Map[BlockingTarget, BlockingState]](Map.empty) blockingStateResolver = BlockingStateResolver(blockingState) workersShutdownRef <- Ref.make[Map[TopicPartition, ShutdownPromise]](Map.empty) @@ -321,7 +321,8 @@ case class RecordConsumerConfig( decryptor: Decryptor[Any, Throwable, Chunk[Byte], Chunk[Byte]] = new NoOpDecryptor, retryProducerAttributes: Map[String, String] = Map.empty, commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA, - rewindUncommittedOffsetsBy: Duration = 0.millis + rewindUncommittedOffsetsBy: Duration = 0.millis, + createRetryTopics: Boolean = true ) extends CommonGreyhoundConfig { override def kafkaProps: Map[String, String] = extraProperties