Skip to content

Commit

Permalink
[server] Complete the corresponding consumer action future in SIT close
Browse files Browse the repository at this point in the history
In SIT internal close we drain the consumer action queue and process DROP_PARTITION and
RESET_OFFSET messages. However, when we process them we did not complete their corresponding
future. This breaks any logic that are waiting for those futures and create false positive
timeout errors.
  • Loading branch information
xunyin8 committed Dec 13, 2024
1 parent 49dc236 commit 8b70947
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1803,26 +1803,34 @@ private void internalClose(boolean doFlush) {

// Only reset Offset and Drop Partition Messages are important, subscribe/unsubscribe will be handled
// on the restart by Helix Controller notifications on the new StoreIngestionTask.
try {
this.storeRepository.unregisterStoreDataChangedListener(this.storageUtilizationManager);
for (ConsumerAction message: consumerActionsQueue) {
ConsumerActionType opType = message.getType();
String topic = message.getTopic();
int partition = message.getPartition();
String replica = Utils.getReplicaId(message.getTopic(), message.getPartition());
this.storeRepository.unregisterStoreDataChangedListener(this.storageUtilizationManager);
for (ConsumerAction message: consumerActionsQueue) {
ConsumerActionType opType = message.getType();
String topic = message.getTopic();
int partition = message.getPartition();
String replica = Utils.getReplicaId(message.getTopic(), message.getPartition());
try {
if (opType == ConsumerActionType.RESET_OFFSET) {
LOGGER.info("Cleanup Reset OffSet. Replica: {}", replica);
storageMetadataService.clearOffset(topic, partition);
message.getFuture().complete(null);
} else if (opType == DROP_PARTITION) {
PubSubTopicPartition topicPartition = message.getTopicPartition();
LOGGER.info("Processing DROP_PARTITION message for {} in internalClose", topicPartition);
dropPartitionSynchronously(topicPartition);
message.getFuture().complete(null);
} else {
LOGGER.info("Cleanup ignoring the Message: {} Replica: {}", message, replica);
}
} catch (Exception e) {
LOGGER.error(
"{} Error while handling consumer action: {} replica: {} in internalClose",
ingestionTaskName,
message,
replica,
e);
message.getFuture().completeExceptionally(e);
}
} catch (Exception e) {
LOGGER.error("{} Error while handling message in internalClose", ingestionTaskName, e);
}
// Unsubscribe any topic partitions related to this version topic from the shared consumer.
aggKafkaConsumerService.unsubscribeAll(versionTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5249,6 +5249,25 @@ public void testUnsubscribeFromTopic() {
verify(storeIngestionTask, times(2)).consumerUnSubscribeForStateTransition(versionTopic, pcs);
}

@Test(timeOut = 10 * Time.MS_PER_SECOND)
public void testStoreIngestionInternalClose() throws Exception {
AtomicReference<CompletableFuture<Void>> dropPartitionFuture = new AtomicReference<>();
StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> {
dropPartitionFuture.set(storeIngestionTaskUnderTest.dropStoragePartitionGracefully(fooTopicPartition));
}, AA_OFF);
// Mock out processConsumerActions to ensure the consumer action queue is not processed
config.setBeforeStartingConsumption(() -> {
try {
doNothing().when(storeIngestionTaskUnderTest).processConsumerActions(any());
} catch (InterruptedException e) {
// ignored
}
});
runTest(config);
// The drop partition consumer action should still be handled as part of internalClose
dropPartitionFuture.get().get();
}

private VeniceStoreVersionConfig getDefaultMockVeniceStoreVersionConfig(
Consumer<VeniceStoreVersionConfig> storeVersionConfigOverride) {
// mock the store config
Expand Down

0 comments on commit 8b70947

Please sign in to comment.