From 1c43323d84c04e0940e5b562c7dfcd1267c12c47 Mon Sep 17 00:00:00 2001 From: Sourav Maji Date: Thu, 12 Dec 2024 16:01:24 -0800 Subject: [PATCH] fixed tests --- .../kafka/consumer/StoreIngestionTask.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 15da4c949a..3afe2e75ba 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -1453,18 +1453,20 @@ private void processIngestionException() { partitionException); // No need to reset again, clearing out the exception. partitionIngestionExceptionList.set(exceptionPartition, null); - } else if (!partitionConsumptionState.isCompletionReported()) { - reportError(partitionException.getMessage(), exceptionPartition, partitionException); } else { - LOGGER.error( - "Ignoring exception for replica: {} since it is already online. The replica will continue serving reads, but the data may be stale as it is not actively ingesting data. Please engage the Venice DEV team immediately.", - Utils.getReplicaId(kafkaVersionTopic, exceptionPartition), - partitionException); - } - // Unsubscribe the partition to avoid more damages. - if (partitionConsumptionStateMap.containsKey(exceptionPartition)) { - // This is not an unsubscribe action from Helix - unSubscribePartition(new PubSubTopicPartitionImpl(versionTopic, exceptionPartition), false); + if (!partitionConsumptionState.isCompletionReported()) { + reportError(partitionException.getMessage(), exceptionPartition, partitionException); + } else { + LOGGER.error( + "Ignoring exception for replica: {} since it is already online. The replica will continue serving reads, but the data may be stale as it is not actively ingesting data. Please engage the Venice DEV team immediately.", + Utils.getReplicaId(kafkaVersionTopic, exceptionPartition), + partitionException); + } + // Unsubscribe the partition to avoid more damages. + if (partitionConsumptionStateMap.containsKey(exceptionPartition)) { + // This is not an unsubscribe action from Helix + unSubscribePartition(new PubSubTopicPartitionImpl(versionTopic, exceptionPartition), false); + } } } });