diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 15da4c949a..3afe2e75ba 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -1453,18 +1453,20 @@ private void processIngestionException() { partitionException); // No need to reset again, clearing out the exception. partitionIngestionExceptionList.set(exceptionPartition, null); - } else if (!partitionConsumptionState.isCompletionReported()) { - reportError(partitionException.getMessage(), exceptionPartition, partitionException); } else { - LOGGER.error( - "Ignoring exception for replica: {} since it is already online. The replica will continue serving reads, but the data may be stale as it is not actively ingesting data. Please engage the Venice DEV team immediately.", - Utils.getReplicaId(kafkaVersionTopic, exceptionPartition), - partitionException); - } - // Unsubscribe the partition to avoid more damages. - if (partitionConsumptionStateMap.containsKey(exceptionPartition)) { - // This is not an unsubscribe action from Helix - unSubscribePartition(new PubSubTopicPartitionImpl(versionTopic, exceptionPartition), false); + if (!partitionConsumptionState.isCompletionReported()) { + reportError(partitionException.getMessage(), exceptionPartition, partitionException); + } else { + LOGGER.error( + "Ignoring exception for replica: {} since it is already online. The replica will continue serving reads, but the data may be stale as it is not actively ingesting data. Please engage the Venice DEV team immediately.", + Utils.getReplicaId(kafkaVersionTopic, exceptionPartition), + partitionException); + } + // Unsubscribe the partition to avoid more damages. + if (partitionConsumptionStateMap.containsKey(exceptionPartition)) { + // This is not an unsubscribe action from Helix + unSubscribePartition(new PubSubTopicPartitionImpl(versionTopic, exceptionPartition), false); + } } } });