diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/StateModelIngestionProgressNotifier.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/StateModelIngestionProgressNotifier.java index b16ffe7bfe..caa3e6e003 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/StateModelIngestionProgressNotifier.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/StateModelIngestionProgressNotifier.java @@ -5,6 +5,7 @@ import com.linkedin.davinci.kafka.consumer.StoreIngestionService; import com.linkedin.davinci.notifier.VeniceNotifier; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.exceptions.VeniceTimeoutException; import com.linkedin.venice.meta.Version; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import java.util.Map; @@ -51,7 +52,7 @@ void waitConsumptionCompleted( // Report ingestion_failure String storeName = Version.parseStoreFromKafkaTopicName(resourceName); storeIngestionService.recordIngestionFailure(storeName); - VeniceException veniceException = new VeniceException(errorMsg); + VeniceTimeoutException veniceException = new VeniceTimeoutException(errorMsg); storeIngestionService.getStoreIngestionTask(resourceName).reportError(errorMsg, partitionId, veniceException); } stateModelToIngestionCompleteFlagMap.remove(stateModelId); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 65d7c95fce..2483370b19 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -1405,8 +1405,8 @@ private void processIngestionException() { /** * Special handling for current version when encountering {@link MemoryLimitExhaustedException}. */ - if (ExceptionUtils.recursiveClassEquals(partitionException, MemoryLimitExhaustedException.class) - && isCurrentVersion.getAsBoolean()) { + if (isCurrentVersion.getAsBoolean() + && ExceptionUtils.recursiveClassEquals(partitionException, MemoryLimitExhaustedException.class)) { LOGGER.warn( "Encountered MemoryLimitExhaustedException, and ingestion task will try to reopen the database and" + " resume the consumption after killing ingestion tasks for non current versions"); @@ -1439,10 +1439,23 @@ private void processIngestionException() { // DaVinci is always a follower. subscribePartition(pubSubTopicPartition, false); } + } else if (isCurrentVersion.getAsBoolean() && resetErrorReplicaEnabled && !isDaVinciClient) { + // marking its replica status ERROR which will later be reset by the controller + zkHelixAdmin.get() + .setPartitionsToError( + serverConfig.getClusterName(), + hostName, + kafkaVersionTopic, + Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, exceptionPartition))); + LOGGER.error( + "Marking current version replica status to ERROR for replica: {}", + Utils.getReplicaId(kafkaVersionTopic, exceptionPartition), + partitionException); + // No need to reset again, clearing out the exception. + partitionIngestionExceptionList.set(exceptionPartition, null); } else { if (!partitionConsumptionState.isCompletionReported()) { reportError(partitionException.getMessage(), exceptionPartition, partitionException); - } else { LOGGER.error( "Ignoring exception for replica: {} since it is already online. The replica will continue serving reads, but the data may be stale as it is not actively ingesting data. Please engage the Venice DEV team immediately.", @@ -1790,6 +1803,16 @@ private void reportError( } else { ingestionNotificationDispatcher.reportError(pcsList, message, consumerEx); } + // Set the replica state to ERROR so that the controller can attempt to reset the partition. + if (isCurrentVersion.getAsBoolean() && !isDaVinciClient && resetErrorReplicaEnabled + && !(consumerEx instanceof VeniceTimeoutException)) { + zkHelixAdmin.get() + .setPartitionsToError( + serverConfig.getClusterName(), + hostName, + kafkaVersionTopic, + Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, partitionId))); + } } private void internalClose(boolean doFlush) { @@ -4222,16 +4245,8 @@ public void reportError(String message, int userPartition, Exception e) { if (partitionConsumptionStateMap.containsKey(userPartition)) { pcsList.add(partitionConsumptionStateMap.get(userPartition)); } + reportError(pcsList, userPartition, message, e); ingestionNotificationDispatcher.reportError(pcsList, message, e); - // Set the replica state to ERROR so that the controller can attempt to reset the partition. - if (!isDaVinciClient && resetErrorReplicaEnabled) { - zkHelixAdmin.get() - .setPartitionsToError( - serverConfig.getClusterName(), - hostName, - kafkaVersionTopic, - Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, userPartition))); - } } public boolean isActiveActiveReplicationEnabled() { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index ad24ec45c8..d894c8b33b 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -27,6 +27,7 @@ import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_RECORD_LEVEL_METRICS_WHEN_BOOTSTRAPPING_CURRENT_VERSION_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_REMOTE_CONSUMER_CONFIG_PREFIX; +import static com.linkedin.venice.ConfigKeys.SERVER_RESET_ERROR_REPLICA_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_UNSUB_AFTER_BATCHPUSH; import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS; @@ -46,6 +47,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.anyString; @@ -385,6 +387,8 @@ public static Object[][] sortedInputAndAAConfigProvider() { private Runnable runnableForKillNonCurrentVersion; + private ZKHelixAdmin zkHelixAdmin; + private static final int PARTITION_COUNT = 10; private static final Set ALL_PARTITIONS = new HashSet<>(); static { @@ -858,6 +862,8 @@ private void runTest( Properties kafkaProps = new Properties(); kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); + zkHelixAdmin = mock(ZKHelixAdmin.class); + doNothing().when(zkHelixAdmin).setPartitionsToError(anyString(), anyString(), anyString(), anyList()); storeIngestionTaskUnderTest = spy( ingestionTaskFactory.getNewIngestionTask( storageService, @@ -870,7 +876,7 @@ private void runTest( false, Optional.empty(), recordTransformerFunction, - Lazy.of(() -> mock(ZKHelixAdmin.class)))); + Lazy.of(() -> zkHelixAdmin))); Future testSubscribeTaskFuture = null; try { @@ -2830,6 +2836,7 @@ private VeniceServerConfig buildVeniceServerConfig(Map extraProp propertyBuilder.put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 1000); propertyBuilder.put(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS, 1000); propertyBuilder.put(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, true); + propertyBuilder.put(SERVER_RESET_ERROR_REPLICA_ENABLED, true); extraProperties.forEach(propertyBuilder::put); Map> kafkaClusterMap = new HashMap<>(); @@ -4371,8 +4378,7 @@ public void testIngestionTaskForNonCurrentVersionShouldFailWhenEncounteringMemor } @Test - public void testIngestionTaskForCurrentVersionShouldTryToKillOngoingPushWhenEncounteringMemoryLimitException() - throws Exception { + public void testIngestionTaskCurrentVersionKillOngoingPushOnMemoryLimitException() throws Exception { doThrow(new MemoryLimitExhaustedException("mock exception")).doNothing() .when(mockAbstractStorageEngine) .put(anyInt(), any(), (ByteBuffer) any()); @@ -4391,6 +4397,30 @@ public void testIngestionTaskForCurrentVersionShouldTryToKillOngoingPushWhenEnco }, AA_OFF); } + @Test + public void testIngestionTaskForCurrentVersionResetException() throws Exception { + doThrow(new VeniceException("mock exception")).doNothing() + .when(mockAbstractStorageEngine) + .put(anyInt(), any(), (ByteBuffer) any()); + isCurrentVersion = () -> true; + + localVeniceWriter.broadcastStartOfPush(new HashMap<>()); + localVeniceWriter.put(putKeyFoo, putValue, EXISTING_SCHEMA_ID, PUT_KEY_FOO_TIMESTAMP, null).get(); + localVeniceWriter.broadcastEndOfPush(new HashMap<>()); + + StoreIngestionTaskTestConfig testConfig = + new StoreIngestionTaskTestConfig(Collections.singleton(PARTITION_FOO), () -> { + // pcs.completionReported(); + verify(mockAbstractStorageEngine, timeout(10000).times(1)).put(eq(PARTITION_FOO), any(), (ByteBuffer) any()); + Utils.sleep(1000); + verify(zkHelixAdmin, atLeast(1)).setPartitionsToError(anyString(), anyString(), anyString(), anyList()); + }, AA_OFF); + testConfig.setStoreVersionConfigOverride(configOverride -> { + doReturn(true).when(configOverride).isResetErrorReplicaEnabled(); + }); + runTest(testConfig); + } + @Test public void testShouldProduceToVersionTopic() throws Exception { runTest(Collections.singleton(PARTITION_FOO), () -> {