Skip to content

Commit

Permalink
[server] Reset current version replica to ERROR on ingestion error (#…
Browse files Browse the repository at this point in the history
…1387)

For hybrid stores whose current version is ingesting the real time data from a real time topic, if there is any exception/error thrown during ingestion they would log error message, sit still and stop ingestion, resulting in stale replicas.
The mitigation mechanism is for oncall to manually restart every impacted cluster, which is very labor-intensive.
This PR tries to resolve that by marking such replicas to ERROR state, which later in a controller task to reset such error replicas, will try to initiate new state transition of those replicas which can possibly mitigate such stale replica issue.

---------

Co-authored-by: Sourav Maji <[email protected]>
  • Loading branch information
majisourav99 and Sourav Maji authored Dec 18, 2024
1 parent eb9fe9b commit 543f65c
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.davinci.kafka.consumer.StoreIngestionService;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceTimeoutException;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Map;
Expand Down Expand Up @@ -51,7 +52,7 @@ void waitConsumptionCompleted(
// Report ingestion_failure
String storeName = Version.parseStoreFromKafkaTopicName(resourceName);
storeIngestionService.recordIngestionFailure(storeName);
VeniceException veniceException = new VeniceException(errorMsg);
VeniceTimeoutException veniceException = new VeniceTimeoutException(errorMsg);
storeIngestionService.getStoreIngestionTask(resourceName).reportError(errorMsg, partitionId, veniceException);
}
stateModelToIngestionCompleteFlagMap.remove(stateModelId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1405,8 +1405,8 @@ private void processIngestionException() {
/**
* Special handling for current version when encountering {@link MemoryLimitExhaustedException}.
*/
if (ExceptionUtils.recursiveClassEquals(partitionException, MemoryLimitExhaustedException.class)
&& isCurrentVersion.getAsBoolean()) {
if (isCurrentVersion.getAsBoolean()
&& ExceptionUtils.recursiveClassEquals(partitionException, MemoryLimitExhaustedException.class)) {
LOGGER.warn(
"Encountered MemoryLimitExhaustedException, and ingestion task will try to reopen the database and"
+ " resume the consumption after killing ingestion tasks for non current versions");
Expand Down Expand Up @@ -1439,10 +1439,23 @@ private void processIngestionException() {
// DaVinci is always a follower.
subscribePartition(pubSubTopicPartition, false);
}
} else if (isCurrentVersion.getAsBoolean() && resetErrorReplicaEnabled && !isDaVinciClient) {
// marking its replica status ERROR which will later be reset by the controller
zkHelixAdmin.get()
.setPartitionsToError(
serverConfig.getClusterName(),
hostName,
kafkaVersionTopic,
Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, exceptionPartition)));
LOGGER.error(
"Marking current version replica status to ERROR for replica: {}",
Utils.getReplicaId(kafkaVersionTopic, exceptionPartition),
partitionException);
// No need to reset again, clearing out the exception.
partitionIngestionExceptionList.set(exceptionPartition, null);
} else {
if (!partitionConsumptionState.isCompletionReported()) {
reportError(partitionException.getMessage(), exceptionPartition, partitionException);

} else {
LOGGER.error(
"Ignoring exception for replica: {} since it is already online. The replica will continue serving reads, but the data may be stale as it is not actively ingesting data. Please engage the Venice DEV team immediately.",
Expand Down Expand Up @@ -1790,6 +1803,16 @@ private void reportError(
} else {
ingestionNotificationDispatcher.reportError(pcsList, message, consumerEx);
}
// Set the replica state to ERROR so that the controller can attempt to reset the partition.
if (isCurrentVersion.getAsBoolean() && !isDaVinciClient && resetErrorReplicaEnabled
&& !(consumerEx instanceof VeniceTimeoutException)) {
zkHelixAdmin.get()
.setPartitionsToError(
serverConfig.getClusterName(),
hostName,
kafkaVersionTopic,
Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, partitionId)));
}
}

private void internalClose(boolean doFlush) {
Expand Down Expand Up @@ -4222,16 +4245,8 @@ public void reportError(String message, int userPartition, Exception e) {
if (partitionConsumptionStateMap.containsKey(userPartition)) {
pcsList.add(partitionConsumptionStateMap.get(userPartition));
}
reportError(pcsList, userPartition, message, e);
ingestionNotificationDispatcher.reportError(pcsList, message, e);
// Set the replica state to ERROR so that the controller can attempt to reset the partition.
if (!isDaVinciClient && resetErrorReplicaEnabled) {
zkHelixAdmin.get()
.setPartitionsToError(
serverConfig.getClusterName(),
hostName,
kafkaVersionTopic,
Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, userPartition)));
}
}

public boolean isActiveActiveReplicationEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_RECORD_LEVEL_METRICS_WHEN_BOOTSTRAPPING_CURRENT_VERSION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_REMOTE_CONSUMER_CONFIG_PREFIX;
import static com.linkedin.venice.ConfigKeys.SERVER_RESET_ERROR_REPLICA_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_UNSUB_AFTER_BATCHPUSH;
import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS;
Expand All @@ -46,6 +47,7 @@
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down Expand Up @@ -385,6 +387,8 @@ public static Object[][] sortedInputAndAAConfigProvider() {

private Runnable runnableForKillNonCurrentVersion;

private ZKHelixAdmin zkHelixAdmin;

private static final int PARTITION_COUNT = 10;
private static final Set<Integer> ALL_PARTITIONS = new HashSet<>();
static {
Expand Down Expand Up @@ -858,6 +862,8 @@ private void runTest(
Properties kafkaProps = new Properties();
kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer());

zkHelixAdmin = mock(ZKHelixAdmin.class);
doNothing().when(zkHelixAdmin).setPartitionsToError(anyString(), anyString(), anyString(), anyList());
storeIngestionTaskUnderTest = spy(
ingestionTaskFactory.getNewIngestionTask(
storageService,
Expand All @@ -870,7 +876,7 @@ private void runTest(
false,
Optional.empty(),
recordTransformerFunction,
Lazy.of(() -> mock(ZKHelixAdmin.class))));
Lazy.of(() -> zkHelixAdmin)));

Future testSubscribeTaskFuture = null;
try {
Expand Down Expand Up @@ -2830,6 +2836,7 @@ private VeniceServerConfig buildVeniceServerConfig(Map<String, Object> extraProp
propertyBuilder.put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 1000);
propertyBuilder.put(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS, 1000);
propertyBuilder.put(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, true);
propertyBuilder.put(SERVER_RESET_ERROR_REPLICA_ENABLED, true);
extraProperties.forEach(propertyBuilder::put);

Map<String, Map<String, String>> kafkaClusterMap = new HashMap<>();
Expand Down Expand Up @@ -4371,8 +4378,7 @@ public void testIngestionTaskForNonCurrentVersionShouldFailWhenEncounteringMemor
}

@Test
public void testIngestionTaskForCurrentVersionShouldTryToKillOngoingPushWhenEncounteringMemoryLimitException()
throws Exception {
public void testIngestionTaskCurrentVersionKillOngoingPushOnMemoryLimitException() throws Exception {
doThrow(new MemoryLimitExhaustedException("mock exception")).doNothing()
.when(mockAbstractStorageEngine)
.put(anyInt(), any(), (ByteBuffer) any());
Expand All @@ -4391,6 +4397,30 @@ public void testIngestionTaskForCurrentVersionShouldTryToKillOngoingPushWhenEnco
}, AA_OFF);
}

@Test
public void testIngestionTaskForCurrentVersionResetException() throws Exception {
doThrow(new VeniceException("mock exception")).doNothing()
.when(mockAbstractStorageEngine)
.put(anyInt(), any(), (ByteBuffer) any());
isCurrentVersion = () -> true;

localVeniceWriter.broadcastStartOfPush(new HashMap<>());
localVeniceWriter.put(putKeyFoo, putValue, EXISTING_SCHEMA_ID, PUT_KEY_FOO_TIMESTAMP, null).get();
localVeniceWriter.broadcastEndOfPush(new HashMap<>());

StoreIngestionTaskTestConfig testConfig =
new StoreIngestionTaskTestConfig(Collections.singleton(PARTITION_FOO), () -> {
// pcs.completionReported();
verify(mockAbstractStorageEngine, timeout(10000).times(1)).put(eq(PARTITION_FOO), any(), (ByteBuffer) any());
Utils.sleep(1000);
verify(zkHelixAdmin, atLeast(1)).setPartitionsToError(anyString(), anyString(), anyString(), anyList());
}, AA_OFF);
testConfig.setStoreVersionConfigOverride(configOverride -> {
doReturn(true).when(configOverride).isResetErrorReplicaEnabled();
});
runTest(testConfig);
}

@Test
public void testShouldProduceToVersionTopic() throws Exception {
runTest(Collections.singleton(PARTITION_FOO), () -> {
Expand Down

0 comments on commit 543f65c

Please sign in to comment.