From e0a23f93552e150e3ee3ec13a444488f8f8bcd47 Mon Sep 17 00:00:00 2001 From: Kai-Sern Lim Date: Thu, 19 Dec 2024 17:20:49 -0800 Subject: [PATCH] =?UTF-8?q?`SharedKafkaConsumer.unSubscribe()`=20was=20mis?= =?UTF-8?q?sing=20the=20`timeoutMs`=20parameter,=20which=20was=20overwritt?= =?UTF-8?q?en=20as=20a=20merge=20conflict=20in=20https://github.com/linked?= =?UTF-8?q?in/venice/pull/1308.=20=F0=9F=AB=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../linkedin/davinci/kafka/consumer/KafkaConsumerService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java index 3b577442a3..7996b27107 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java @@ -262,7 +262,7 @@ public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTop * setting data receiver and unsubscribing concurrently for the same topic partition on a shared consumer. */ try (AutoCloseableLock ignored = AutoCloseableLock.of(consumerToLocks.get(consumer))) { - consumer.unSubscribe(pubSubTopicPartition); + consumer.unSubscribe(pubSubTopicPartition, timeoutMs); removeTopicPartitionFromConsumptionTask(consumer, pubSubTopicPartition); } versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> {