Skip to content

Commit

Permalink
[server] Complete the corresponding consumer action future in SIT clo…
Browse files Browse the repository at this point in the history
…se (#1391)

In SIT internal close we drain the consumer action queue and process DROP_PARTITION and
RESET_OFFSET messages. However, when we process them we did not complete their corresponding
future. This breaks any logic that are waiting for those futures and create false positive
timeout errors.
  • Loading branch information
xunyin8 authored Dec 13, 2024
1 parent bf6eead commit a15f3c5
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1803,26 +1803,34 @@ private void internalClose(boolean doFlush) {

// Only reset Offset and Drop Partition Messages are important, subscribe/unsubscribe will be handled
// on the restart by Helix Controller notifications on the new StoreIngestionTask.
try {
this.storeRepository.unregisterStoreDataChangedListener(this.storageUtilizationManager);
for (ConsumerAction message: consumerActionsQueue) {
ConsumerActionType opType = message.getType();
String topic = message.getTopic();
int partition = message.getPartition();
String replica = Utils.getReplicaId(message.getTopic(), message.getPartition());
this.storeRepository.unregisterStoreDataChangedListener(this.storageUtilizationManager);
for (ConsumerAction message: consumerActionsQueue) {
ConsumerActionType opType = message.getType();
String topic = message.getTopic();
int partition = message.getPartition();
String replica = Utils.getReplicaId(message.getTopic(), message.getPartition());
try {
if (opType == ConsumerActionType.RESET_OFFSET) {
LOGGER.info("Cleanup Reset OffSet. Replica: {}", replica);
storageMetadataService.clearOffset(topic, partition);
message.getFuture().complete(null);
} else if (opType == DROP_PARTITION) {
PubSubTopicPartition topicPartition = message.getTopicPartition();
LOGGER.info("Processing DROP_PARTITION message for {} in internalClose", topicPartition);
dropPartitionSynchronously(topicPartition);
message.getFuture().complete(null);
} else {
LOGGER.info("Cleanup ignoring the Message: {} Replica: {}", message, replica);
}
} catch (Exception e) {
LOGGER.error(
"{} Error while handling consumer action: {} replica: {} in internalClose",
ingestionTaskName,
message,
replica,
e);
message.getFuture().completeExceptionally(e);
}
} catch (Exception e) {
LOGGER.error("{} Error while handling message in internalClose", ingestionTaskName, e);
}
// Unsubscribe any topic partitions related to this version topic from the shared consumer.
aggKafkaConsumerService.unsubscribeAll(versionTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5249,6 +5249,25 @@ public void testUnsubscribeFromTopic() {
verify(storeIngestionTask, times(2)).consumerUnSubscribeForStateTransition(versionTopic, pcs);
}

@Test(timeOut = 10 * Time.MS_PER_SECOND)
public void testStoreIngestionInternalClose() throws Exception {
AtomicReference<CompletableFuture<Void>> dropPartitionFuture = new AtomicReference<>();
StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> {
dropPartitionFuture.set(storeIngestionTaskUnderTest.dropStoragePartitionGracefully(fooTopicPartition));
}, AA_OFF);
// Mock out processConsumerActions to ensure the consumer action queue is not processed
config.setBeforeStartingConsumption(() -> {
try {
doNothing().when(storeIngestionTaskUnderTest).processConsumerActions(any());
} catch (InterruptedException e) {
// ignored
}
});
runTest(config);
// The drop partition consumer action should still be handled as part of internalClose
dropPartitionFuture.get().get();
}

private VeniceStoreVersionConfig getDefaultMockVeniceStoreVersionConfig(
Consumer<VeniceStoreVersionConfig> storeVersionConfigOverride) {
// mock the store config
Expand Down

0 comments on commit a15f3c5

Please sign in to comment.