From 8b70947bf2e6c6c1a44f973274e36e8fa23537e4 Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Thu, 12 Dec 2024 20:35:13 -0800 Subject: [PATCH] [server] Complete the corresponding consumer action future in SIT close In SIT internal close we drain the consumer action queue and process DROP_PARTITION and RESET_OFFSET messages. However, when we process them we did not complete their corresponding future. This breaks any logic that are waiting for those futures and create false positive timeout errors. --- .../kafka/consumer/StoreIngestionTask.java | 26 ++++++++++++------- .../consumer/StoreIngestionTaskTest.java | 19 ++++++++++++++ 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 36cc7eb82a..65d7c95fce 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -1803,26 +1803,34 @@ private void internalClose(boolean doFlush) { // Only reset Offset and Drop Partition Messages are important, subscribe/unsubscribe will be handled // on the restart by Helix Controller notifications on the new StoreIngestionTask. - try { - this.storeRepository.unregisterStoreDataChangedListener(this.storageUtilizationManager); - for (ConsumerAction message: consumerActionsQueue) { - ConsumerActionType opType = message.getType(); - String topic = message.getTopic(); - int partition = message.getPartition(); - String replica = Utils.getReplicaId(message.getTopic(), message.getPartition()); + this.storeRepository.unregisterStoreDataChangedListener(this.storageUtilizationManager); + for (ConsumerAction message: consumerActionsQueue) { + ConsumerActionType opType = message.getType(); + String topic = message.getTopic(); + int partition = message.getPartition(); + String replica = Utils.getReplicaId(message.getTopic(), message.getPartition()); + try { if (opType == ConsumerActionType.RESET_OFFSET) { LOGGER.info("Cleanup Reset OffSet. Replica: {}", replica); storageMetadataService.clearOffset(topic, partition); + message.getFuture().complete(null); } else if (opType == DROP_PARTITION) { PubSubTopicPartition topicPartition = message.getTopicPartition(); LOGGER.info("Processing DROP_PARTITION message for {} in internalClose", topicPartition); dropPartitionSynchronously(topicPartition); + message.getFuture().complete(null); } else { LOGGER.info("Cleanup ignoring the Message: {} Replica: {}", message, replica); } + } catch (Exception e) { + LOGGER.error( + "{} Error while handling consumer action: {} replica: {} in internalClose", + ingestionTaskName, + message, + replica, + e); + message.getFuture().completeExceptionally(e); } - } catch (Exception e) { - LOGGER.error("{} Error while handling message in internalClose", ingestionTaskName, e); } // Unsubscribe any topic partitions related to this version topic from the shared consumer. aggKafkaConsumerService.unsubscribeAll(versionTopic); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index e494b9df42..c174a8f584 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -5249,6 +5249,25 @@ public void testUnsubscribeFromTopic() { verify(storeIngestionTask, times(2)).consumerUnSubscribeForStateTransition(versionTopic, pcs); } + @Test(timeOut = 10 * Time.MS_PER_SECOND) + public void testStoreIngestionInternalClose() throws Exception { + AtomicReference> dropPartitionFuture = new AtomicReference<>(); + StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> { + dropPartitionFuture.set(storeIngestionTaskUnderTest.dropStoragePartitionGracefully(fooTopicPartition)); + }, AA_OFF); + // Mock out processConsumerActions to ensure the consumer action queue is not processed + config.setBeforeStartingConsumption(() -> { + try { + doNothing().when(storeIngestionTaskUnderTest).processConsumerActions(any()); + } catch (InterruptedException e) { + // ignored + } + }); + runTest(config); + // The drop partition consumer action should still be handled as part of internalClose + dropPartitionFuture.get().get(); + } + private VeniceStoreVersionConfig getDefaultMockVeniceStoreVersionConfig( Consumer storeVersionConfigOverride) { // mock the store config