diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AbstractKafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AbstractKafkaConsumerService.java index 63d26dd9ae3..bf6c68564ec 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AbstractKafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AbstractKafkaConsumerService.java @@ -21,7 +21,11 @@ public abstract SharedKafkaConsumer getConsumerAssignedToVersionTopicPartition( public abstract void unsubscribeAll(PubSubTopic versionTopic); - public abstract void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition); + public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) { + unSubscribe(versionTopic, pubSubTopicPartition, SharedKafkaConsumer.DEFAULT_MAX_WAIT_MS); + } + + public abstract void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs); public abstract void batchUnsubscribe(PubSubTopic versionTopic, Set topicPartitionsToUnSub); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index a4f1a170953..e8d6e8869b7 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -1143,7 +1143,7 @@ protected void leaderExecuteTopicSwitch( newSourceTopic, unreachableBrokerList); // unsubscribe the old source and subscribe to the new source - consumerUnSubscribe(currentLeaderTopic, partitionConsumptionState); + consumerUnSubscribeForStateTransition(currentLeaderTopic, partitionConsumptionState); waitForLastLeaderPersistFuture( partitionConsumptionState, String.format( diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java index 628b93b510f..9c80e6674e3 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java @@ -364,8 +364,15 @@ void resetOffsetFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPa } public void unsubscribeConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) { + unsubscribeConsumerFor(versionTopic, pubSubTopicPartition, SharedKafkaConsumer.DEFAULT_MAX_WAIT_MS); + } + + public void unsubscribeConsumerFor( + PubSubTopic versionTopic, + PubSubTopicPartition pubSubTopicPartition, + long timeoutMs) { for (AbstractKafkaConsumerService consumerService: kafkaServerToConsumerServiceMap.values()) { - consumerService.unSubscribe(versionTopic, pubSubTopicPartition); + consumerService.unSubscribe(versionTopic, pubSubTopicPartition, timeoutMs); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java index b7d76bb9309..24110dbea64 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java @@ -243,15 +243,15 @@ public void unsubscribeAll(PubSubTopic versionTopic) { * Stop specific subscription associated with the given version topic. */ @Override - public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) { - PubSubConsumerAdapter consumer = getConsumerAssignedToVersionTopicPartition(versionTopic, pubSubTopicPartition); + public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs) { + SharedKafkaConsumer consumer = getConsumerAssignedToVersionTopicPartition(versionTopic, pubSubTopicPartition); if (consumer != null) { /** * Refer {@link KafkaConsumerService#startConsumptionIntoDataReceiver} for avoiding race condition caused by * setting data receiver and unsubscribing concurrently for the same topic partition on a shared consumer. */ synchronized (consumer) { - consumer.unSubscribe(pubSubTopicPartition); + consumer.unSubscribe(pubSubTopicPartition, timeoutMs); removeTopicPartitionFromConsumptionTask(consumer, pubSubTopicPartition); } versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegator.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegator.java index 8947726ad43..d018a000a00 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegator.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegator.java @@ -153,10 +153,10 @@ public void unsubscribeAll(PubSubTopic versionTopic) { } @Override - public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) { + public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs) { KafkaConsumerService kafkaConsumerService = getKafkaConsumerService(versionTopic, pubSubTopicPartition); if (kafkaConsumerService != null) { - kafkaConsumerService.unSubscribe(versionTopic, pubSubTopicPartition); + kafkaConsumerService.unSubscribe(versionTopic, pubSubTopicPartition, timeoutMs); topicPartitionToConsumerService.remove(new TopicPartitionForIngestion(versionTopic, pubSubTopicPartition)); } else { LOGGER.warn( diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index a09a201c587..3c866e91027 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -508,7 +508,7 @@ protected void processConsumerAction(ConsumerAction message, Store store) throws PubSubTopic topic = message.getTopicPartition().getPubSubTopic(); PubSubTopic leaderTopic = offsetRecord.getLeaderTopic(pubSubTopicRepository); if (leaderTopic != null && (!topic.equals(leaderTopic) || partitionConsumptionState.consumeRemotely())) { - consumerUnSubscribe(leaderTopic, partitionConsumptionState); + consumerUnSubscribeForStateTransition(leaderTopic, partitionConsumptionState); waitForAllMessageToBeProcessedFromTopicPartition( new PubSubTopicPartitionImpl(leaderTopic, partition), partitionConsumptionState); @@ -605,7 +605,7 @@ protected void checkLongRunningTaskState() throws InterruptedException { * this replica can finally be promoted to leader. */ // unsubscribe from previous topic/partition - consumerUnSubscribe(versionTopic, partitionConsumptionState); + consumerUnSubscribeForStateTransition(versionTopic, partitionConsumptionState); LOGGER.info( "Starting promotion of replica: {} to leader for the partition, unsubscribed from current topic: {}", @@ -675,7 +675,7 @@ protected void checkLongRunningTaskState() throws InterruptedException { /** If LEADER is consuming remote VT or SR, EOP is already received, switch back to local fabrics. */ if (shouldLeaderSwitchToLocalConsumption(partitionConsumptionState)) { // Unsubscribe from remote Kafka topic, but keep the consumer in cache. - consumerUnSubscribe(currentLeaderTopic, partitionConsumptionState); + consumerUnSubscribeForStateTransition(currentLeaderTopic, partitionConsumptionState); // If remote consumption flag is false, existing messages for the partition in the drainer queue should be // processed before that PubSubTopicPartition leaderTopicPartition = @@ -1015,7 +1015,7 @@ protected void leaderExecuteTopicSwitch( } // unsubscribe the old source and subscribe to the new source - consumerUnSubscribe(currentLeaderTopic, partitionConsumptionState); + consumerUnSubscribeForStateTransition(currentLeaderTopic, partitionConsumptionState); waitForLastLeaderPersistFuture( partitionConsumptionState, String.format( @@ -3816,7 +3816,7 @@ protected void resubscribe(PartitionConsumptionState partitionConsumptionState) protected void resubscribeAsFollower(PartitionConsumptionState partitionConsumptionState) throws InterruptedException { int partition = partitionConsumptionState.getPartition(); - consumerUnSubscribe(versionTopic, partitionConsumptionState); + consumerUnSubscribeForStateTransition(versionTopic, partitionConsumptionState); waitForAllMessageToBeProcessedFromTopicPartition( new PubSubTopicPartitionImpl(versionTopic, partition), partitionConsumptionState); @@ -3836,7 +3836,7 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio OffsetRecord offsetRecord = partitionConsumptionState.getOffsetRecord(); PubSubTopic leaderTopic = offsetRecord.getLeaderTopic(pubSubTopicRepository); int partition = partitionConsumptionState.getPartition(); - consumerUnSubscribe(leaderTopic, partitionConsumptionState); + consumerUnSubscribeForStateTransition(leaderTopic, partitionConsumptionState); PubSubTopicPartition leaderTopicPartition = new PubSubTopicPartitionImpl(leaderTopic, partition); waitForAllMessageToBeProcessedFromTopicPartition( new PubSubTopicPartitionImpl(leaderTopic, partition), diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.java index 735554a5e0d..21f82134ad7 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -39,8 +40,15 @@ * TODO: move this logic inside consumption task, this class does not need to be sub-class of {@link PubSubConsumerAdapter} */ class SharedKafkaConsumer implements PubSubConsumerAdapter { + public static final long DEFAULT_MAX_WAIT_MS = TimeUnit.SECONDS.toMillis(10); + /** + * Increase the max wait during state transitions to ensure that it waits for the messages to finish processing. A + * poll() indicates that all previous inflight messages under the previous state were processed, so there can't be a + * state mismatch. The consumer_records_producing_to_write_buffer_latency metric suggests how long the wait should be. + */ + public static final long STATE_TRANSITION_MAX_WAIT_MS = TimeUnit.MINUTES.toMillis(30); + private static final Logger LOGGER = LogManager.getLogger(SharedKafkaConsumer.class); - private long nextPollTimeOutSeconds = 10; protected final PubSubConsumerAdapter delegate; @@ -61,6 +69,8 @@ class SharedKafkaConsumer implements PubSubConsumerAdapter { */ private final AtomicBoolean waitingForPoll = new AtomicBoolean(false); + private long timeoutMsOverride = -1; // for unit test purposes + private final Time time; /** @@ -145,20 +155,23 @@ synchronized void subscribe( updateCurrentAssignment(delegate.getAssignment()); } + public synchronized void unSubscribe(PubSubTopicPartition pubSubTopicPartition) { + unSubscribe(pubSubTopicPartition, DEFAULT_MAX_WAIT_MS); + } + /** * There is an additional goal of this function which is to make sure that all the records consumed for this {topic,partition} prior to * calling unsubscribe here is produced to drainer service. {@link ConsumptionTask#run()} ends up calling * {@link SharedKafkaConsumer#poll(long)} and produceToStoreBufferService sequentially. So waiting for at least one more * invocation of {@link SharedKafkaConsumer#poll(long)} achieves the above objective. */ - @Override - public synchronized void unSubscribe(PubSubTopicPartition pubSubTopicPartition) { + public synchronized void unSubscribe(PubSubTopicPartition pubSubTopicPartition, long timeoutMs) { unSubscribeAction(() -> { this.delegate.unSubscribe(pubSubTopicPartition); PubSubTopic versionTopic = subscribedTopicPartitionToVersionTopic.remove(pubSubTopicPartition); unsubscriptionListener.call(this, versionTopic, pubSubTopicPartition); return Collections.singleton(pubSubTopicPartition); - }); + }, timeoutMs); } @Override @@ -170,7 +183,7 @@ public synchronized void batchUnsubscribe(Set pubSubTopicP unsubscriptionListener.call(this, versionTopic, pubSubTopicPartition); } return pubSubTopicPartitionSet; - }); + }, DEFAULT_MAX_WAIT_MS); } /** @@ -179,7 +192,7 @@ public synchronized void batchUnsubscribe(Set pubSubTopicP * * @param supplier which performs the unsubscription and returns a set of partitions which were unsubscribed */ - protected synchronized void unSubscribeAction(Supplier> supplier) { + protected synchronized void unSubscribeAction(Supplier> supplier, long timeoutMs) { long currentPollTimes = pollTimes; Set topicPartitions = supplier.get(); long startTime = System.currentTimeMillis(); @@ -191,28 +204,44 @@ protected synchronized void unSubscribeAction(Supplier topicPartitions, elapsedTime); updateCurrentAssignment(delegate.getAssignment()); - waitAfterUnsubscribe(currentPollTimes, topicPartitions); + waitAfterUnsubscribe(currentPollTimes, topicPartitions, timeoutMs); } - protected void waitAfterUnsubscribe(long currentPollTimes, Set topicPartitions) { + protected void waitAfterUnsubscribe( + long currentPollTimes, + Set topicPartitions, + long timeoutMs) { + // This clause is only for unit test purposes, for when the timeout needs to be set to 0. + if (timeoutMsOverride != -1) { + timeoutMs = timeoutMsOverride; + } + currentPollTimes++; waitingForPoll.set(true); // Wait for the next poll or maximum 10 seconds. Interestingly wait api does not provide any indication if wait // returned // due to timeout. So an explicit time check is necessary. - long timeoutMs = (time.getNanoseconds() / Time.NS_PER_MS) + nextPollTimeOutSeconds * Time.MS_PER_SECOND; + final long startTimeMs = time.getMilliseconds(); + final long endTimeMs = startTimeMs + timeoutMs; try { while (currentPollTimes > pollTimes) { - long waitMs = timeoutMs - (time.getNanoseconds() / Time.NS_PER_MS); + final long waitMs = endTimeMs - time.getMilliseconds(); if (waitMs <= 0) { LOGGER.warn( - "Wait for poll request after unsubscribe topic partition(s) ({}) timed out after {} seconds", + "Wait for poll request after unsubscribe topic partition(s) ({}) timed out after {} milliseconds", topicPartitions, - nextPollTimeOutSeconds); + timeoutMs); break; } wait(waitMs); } + final long elapsedMs = time.getMilliseconds() - startTimeMs; + if (elapsedMs > TimeUnit.SECONDS.toMillis(15)) { + LOGGER.warn( + "Wait for poll request after unsubscribe topic partition(s) ({}) took {} milliseconds", + topicPartitions, + elapsedMs); + } // no action to take actually, just return; } catch (InterruptedException e) { LOGGER.info("Wait for poll request in `unsubscribe` function got interrupted."); @@ -221,8 +250,8 @@ protected void waitAfterUnsubscribe(long currentPollTimes, Set partitionsFor(PubSubTopic topic) { throw new UnsupportedOperationException("partitionsFor is not supported in SharedKafkaConsumer"); } - - // Test only - public void setNextPollTimeOutSeconds(long seconds) { - this.nextPollTimeOutSeconds = seconds; - } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 8ac571b5d1c..aa035fec0e3 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -3401,11 +3401,20 @@ public boolean consumerHasSubscription(PubSubTopic topic, PartitionConsumptionSt .hasConsumerAssignedFor(versionTopic, new PubSubTopicPartitionImpl(topic, partitionId)); } - public void consumerUnSubscribe(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) { + /** + * It is important during a state transition to wait in {@link SharedKafkaConsumer#waitAfterUnsubscribe(long, Set, long)} + * until all inflight messages have been processed by the consumer, otherwise there could be a mismatch in the PCS's + * leader-follower state vs the intended state when the message was polled. Thus, we use an increased timeout of up to + * 30 minutes according to the maximum value of the metric consumer_records_producing_to_write_buffer_latency. + */ + public void consumerUnSubscribeForStateTransition( + PubSubTopic topic, + PartitionConsumptionState partitionConsumptionState) { Instant startTime = Instant.now(); int partitionId = partitionConsumptionState.getPartition(); PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(topic, partitionId); - aggKafkaConsumerService.unsubscribeConsumerFor(versionTopic, topicPartition); + aggKafkaConsumerService + .unsubscribeConsumerFor(versionTopic, topicPartition, SharedKafkaConsumer.STATE_TRANSITION_MAX_WAIT_MS); LOGGER.info( "Consumer unsubscribed to topic-partition: {} for replica: {}. Took {} ms", topicPartition, diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java index cc9f9324673..4ddba90b065 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java @@ -65,11 +65,41 @@ public static Object[][] methodList() { { "unSubscribe" }, { "getOffsetLagBasedOnMetrics" }, { "getLatestOffsetBasedOnMetrics" } }; } + private void invokeAndVerify( + KafkaConsumerServiceDelegator delegator, + KafkaConsumerService invokedConsumerService, + KafkaConsumerService unusedConsumerService, + PubSubTopic versionTopic, + PubSubTopicPartition topicPartition, + String methodName) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + boolean includeLongParam = methodName.equals("unSubscribe"); + if (includeLongParam) { + Method testMethod = KafkaConsumerServiceDelegator.class + .getMethod(methodName, PubSubTopic.class, PubSubTopicPartition.class, long.class); + Method verifyMethod = + KafkaConsumerService.class.getMethod(methodName, PubSubTopic.class, PubSubTopicPartition.class, long.class); + testMethod.invoke(delegator, versionTopic, topicPartition, 0L); + verifyMethod.invoke(verify(invokedConsumerService), versionTopic, topicPartition, 0L); + verifyMethod.invoke(verify(unusedConsumerService, never()), versionTopic, topicPartition, 0L); + } else { + Method testMethod = + KafkaConsumerServiceDelegator.class.getMethod(methodName, PubSubTopic.class, PubSubTopicPartition.class); + Method verifyMethod = + KafkaConsumerService.class.getMethod(methodName, PubSubTopic.class, PubSubTopicPartition.class); + testMethod.invoke(delegator, versionTopic, topicPartition); + verifyMethod.invoke(verify(invokedConsumerService), versionTopic, topicPartition); + verifyMethod.invoke(verify(unusedConsumerService, never()), versionTopic, topicPartition); + } + + reset(invokedConsumerService); + reset(unusedConsumerService); + } + @Test(dataProvider = "Method-List") public void chooseConsumerServiceTest(String methodName) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { - KafkaConsumerService mockDefaultConsumerService = mock(KafkaConsumerService.class); - KafkaConsumerService mockDedicatedConsumerService = mock(KafkaConsumerService.class); + KafkaConsumerService defaultMockService = mock(KafkaConsumerService.class); + KafkaConsumerService dedicatedMockService = mock(KafkaConsumerService.class); VeniceServerConfig mockConfig = mock(VeniceServerConfig.class); doReturn(true).when(mockConfig).isDedicatedConsumerPoolForAAWCLeaderEnabled(); doReturn(KafkaConsumerServiceDelegator.ConsumerPoolStrategyType.AA_OR_WC_LEADER_DEDICATED).when(mockConfig) @@ -78,8 +108,8 @@ public void chooseConsumerServiceTest(String methodName) Function isAAWCStoreFunc = vt -> true; KafkaConsumerServiceDelegator.KafkaConsumerServiceBuilder consumerServiceBuilder = (ignored, poolType) -> poolType.equals(ConsumerPoolType.REGULAR_POOL) - ? mockDefaultConsumerService - : mockDedicatedConsumerService; + ? defaultMockService + : dedicatedMockService; KafkaConsumerServiceDelegator delegator = new KafkaConsumerServiceDelegator(mockConfig, consumerServiceBuilder, isAAWCStoreFunc); @@ -105,35 +135,16 @@ public void chooseConsumerServiceTest(String methodName) PartitionReplicaIngestionContext.WorkloadType.NON_AA_OR_WRITE_COMPUTE); delegator.startConsumptionIntoDataReceiver(topicPartitionIngestionContextForRT, 0, dataReceiver); - Method testMethod = - KafkaConsumerServiceDelegator.class.getMethod(methodName, PubSubTopic.class, PubSubTopicPartition.class); - Method verifyMethod = - KafkaConsumerService.class.getMethod(methodName, PubSubTopic.class, PubSubTopicPartition.class); - - testMethod.invoke(delegator, versionTopic, topicPartitionForVT); - verifyMethod.invoke(verify(mockDefaultConsumerService), versionTopic, topicPartitionForVT); - verifyMethod.invoke(verify(mockDedicatedConsumerService, never()), versionTopic, topicPartitionForVT); - reset(mockDefaultConsumerService); - reset(mockDedicatedConsumerService); - testMethod.invoke(delegator, versionTopic, topicPartitionForRT); - verifyMethod.invoke(verify(mockDedicatedConsumerService), versionTopic, topicPartitionForRT); - verifyMethod.invoke(verify(mockDefaultConsumerService, never()), versionTopic, topicPartitionForRT); - reset(mockDefaultConsumerService); - reset(mockDedicatedConsumerService); + invokeAndVerify(delegator, defaultMockService, dedicatedMockService, versionTopic, topicPartitionForVT, methodName); + invokeAndVerify(delegator, dedicatedMockService, defaultMockService, versionTopic, topicPartitionForRT, methodName); isAAWCStoreFunc = vt -> false; delegator = new KafkaConsumerServiceDelegator(mockConfig, consumerServiceBuilder, isAAWCStoreFunc); delegator.startConsumptionIntoDataReceiver(topicPartitionIngestionContextForVT, 0, dataReceiver); delegator.startConsumptionIntoDataReceiver(topicPartitionIngestionContextForRT, 0, dataReceiver); - testMethod.invoke(delegator, versionTopic, topicPartitionForVT); - verifyMethod.invoke(verify(mockDefaultConsumerService), versionTopic, topicPartitionForVT); - verifyMethod.invoke(verify(mockDedicatedConsumerService, never()), versionTopic, topicPartitionForVT); - reset(mockDefaultConsumerService); - reset(mockDedicatedConsumerService); - testMethod.invoke(delegator, versionTopic, topicPartitionForRT); - verifyMethod.invoke(verify(mockDefaultConsumerService), versionTopic, topicPartitionForRT); - verifyMethod.invoke(verify(mockDedicatedConsumerService, never()), versionTopic, topicPartitionForRT); + invokeAndVerify(delegator, defaultMockService, dedicatedMockService, versionTopic, topicPartitionForVT, methodName); + invokeAndVerify(delegator, defaultMockService, dedicatedMockService, versionTopic, topicPartitionForRT, methodName); } @Test @@ -321,7 +332,8 @@ public void consumerAssignmentStickiness() { // Change the AAWC flag retValueForIsAAWCStoreFunc.set(true); delegator.unSubscribe(versionTopic, topicPartitionForRT); - verify(mockDefaultConsumerService).unSubscribe(versionTopic, topicPartitionForRT); + verify(mockDefaultConsumerService) + .unSubscribe(versionTopic, topicPartitionForRT, SharedKafkaConsumer.DEFAULT_MAX_WAIT_MS); verify(mockDedicatedConsumerService, never()).unSubscribe(versionTopic, topicPartitionForRT); } @@ -614,7 +626,7 @@ private Runnable getResubscriptionRunnableFor( consumerServiceDelegator .startConsumptionIntoDataReceiver(partitionReplicaIngestionContext, 0, consumedDataReceiver); // Avoid wait time here to increase the chance for race condition. - consumerServiceDelegator.assignConsumerFor(versionTopic, pubSubTopicPartition).setNextPollTimeOutSeconds(0); + consumerServiceDelegator.assignConsumerFor(versionTopic, pubSubTopicPartition).setTimeoutMsOverride(0L); int versionNum = Version.parseVersionFromKafkaTopicName(partitionReplicaIngestionContext.getVersionTopic().getName()); if (versionNum % 3 == 0) { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumerTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumerTest.java index fd0fd52d9e2..897cf4f121b 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumerTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumerTest.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.testng.Assert; import org.testng.annotations.BeforeMethod; @@ -72,7 +73,7 @@ public void testSubscriptionEmptyPoll() { private void setUpSharedConsumer() { consumerAdapter = mock(PubSubConsumerAdapter.class); - AggKafkaConsumerServiceStats stats = mock(AggKafkaConsumerServiceStats.class); + stats = mock(AggKafkaConsumerServiceStats.class); Runnable assignmentChangeListener = mock(Runnable.class); SharedKafkaConsumer.UnsubscriptionListener unsubscriptionListener = mock(SharedKafkaConsumer.UnsubscriptionListener.class); @@ -91,13 +92,11 @@ private void setUpSharedConsumer() { public void testWaitAfterUnsubscribe() { setUpSharedConsumer(); Supplier> supplier = () -> topicPartitions; - - long poolTimesBeforeUnsubscribe = sharedKafkaConsumer.getPollTimes(); - sharedKafkaConsumer.setNextPollTimeoutSeconds(1); - sharedKafkaConsumer.unSubscribeAction(supplier); + long pollTimesBeforeUnsubscribe = sharedKafkaConsumer.getPollTimes(); + sharedKafkaConsumer.unSubscribeAction(supplier, TimeUnit.SECONDS.toMillis(1)); // This is to test that if the poll time is not incremented when the consumer is unsubscribed the correct log can // be found in the logs. - Assert.assertEquals(poolTimesBeforeUnsubscribe, sharedKafkaConsumer.getPollTimes()); + Assert.assertEquals(pollTimesBeforeUnsubscribe, sharedKafkaConsumer.getPollTimes()); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 4966daf8dc5..53e5b750893 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -1185,6 +1185,27 @@ private void prepareAggKafkaConsumerServiceMock() { return null; }).when(aggKafkaConsumerService).unsubscribeConsumerFor(any(), any()); + doAnswer(invocation -> { + PubSubTopic versionTopic = invocation.getArgument(0, PubSubTopic.class); + PubSubTopicPartition pubSubTopicPartition = invocation.getArgument(1, PubSubTopicPartition.class); + Long timeoutMs = invocation.getArgument(2, Long.class); + /** + * The internal {@link SharedKafkaConsumer} has special logic for unsubscription to avoid some race condition + * between the fast unsubscribe and re-subscribe. + * Please check {@link SharedKafkaConsumer#unSubscribe} to find more details. + * + * We shouldn't use {@link #mockLocalKafkaConsumer} or {@link #inMemoryRemoteKafkaConsumer} here since + * they don't have the proper synchronization. + */ + if (inMemoryLocalKafkaConsumer.hasSubscription(pubSubTopicPartition)) { + localKafkaConsumerService.unSubscribe(versionTopic, pubSubTopicPartition, timeoutMs.longValue()); + } + if (inMemoryRemoteKafkaConsumer.hasSubscription(pubSubTopicPartition)) { + remoteKafkaConsumerService.unSubscribe(versionTopic, pubSubTopicPartition, timeoutMs.longValue()); + } + return null; + }).when(aggKafkaConsumerService).unsubscribeConsumerFor(any(), any(), anyLong()); + doAnswer(invocation -> { PubSubTopicPartition pubSubTopicPartition = invocation.getArgument(1, PubSubTopicPartition.class); if (inMemoryLocalKafkaConsumer.hasSubscription(pubSubTopicPartition)) { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index 231c7eb4825..dcadb283f27 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -118,6 +118,11 @@ private ConfigKeys() { public static final String PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_THREAD_POOL_SIZE = "pubsub.topic.manager.metadata.fetcher.thread.pool.size"; + /** + * How long to wait for the next poll request after unsubscribing, indicating that old messages were processed. + */ + public static final String SERVER_WAIT_AFTER_UNSUBSCRIBE_TIMEOUT_MS = "server.wait.after.unsubscribe.timeout.ms"; + // Cluster specific configs for controller public static final String CONTROLLER_NAME = "controller.name";