Skip to content

Commit

Permalink
[server] Fix NPE in StoreAwarePartitionWiseKafkaConsumerService#handl…
Browse files Browse the repository at this point in the history
…eUnsubscription (#1402)

This PR only fixes NPE exception in StoreAwarePartitionWiseKafkaConsumerService#handleUnsubscription. 2nd call to the same TP unsubscription will be ignored, as expected, as it should be decrease twice.

There is still race condition in upstream call where unsubscribe and unsubscribeAll from SIT/Participant thread can dual unsubscribe on the same TP, due to race condition in certain map bookkeeping operation. This PR does not fix that, due to other unclear performance concern. It is harmless for now, as 2nd call to the same TP will be ignored by consumer.
  • Loading branch information
sixpluszero authored Dec 17, 2024
1 parent 3f0bcb5 commit c34eb10
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,17 @@ public synchronized void batchUnsubscribe(Set<PubSubTopicPartition> pubSubTopicP
*/
protected synchronized void unSubscribeAction(Supplier<Set<PubSubTopicPartition>> supplier, long timeoutMs) {
long currentPollTimes = pollTimes;
Set<PubSubTopicPartition> topicPartitions = supplier.get();
long startTime = System.currentTimeMillis();
Set<PubSubTopicPartition> topicPartitions = supplier.get();
updateCurrentAssignment(delegate.getAssignment());
waitAfterUnsubscribe(currentPollTimes, topicPartitions, timeoutMs);
long elapsedTime = System.currentTimeMillis() - startTime;
LOGGER.info(
"Shared consumer {} unsubscribed {} partition(s): ({}) in {} ms",
this.getClass().getSimpleName(),
topicPartitions.size(),
topicPartitions,
elapsedTime);
updateCurrentAssignment(delegate.getAssignment());
waitAfterUnsubscribe(currentPollTimes, topicPartitions, timeoutMs);
}

protected void waitAfterUnsubscribe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void handleUnsubscription(
PubSubTopic versionTopic,
PubSubTopicPartition pubSubTopicPartition) {
super.handleUnsubscription(consumer, versionTopic, pubSubTopicPartition);
decreaseConsumerStoreLoad(consumer, versionTopic.getStoreName());
decreaseConsumerStoreLoad(consumer, versionTopic);
}

int getConsumerStoreLoad(SharedKafkaConsumer consumer, String storeName) {
Expand All @@ -138,7 +138,20 @@ void increaseConsumerStoreLoad(SharedKafkaConsumer consumer, String storeName) {
.compute(storeName, (k, v) -> (v == null) ? 1 : v + 1);
}

void decreaseConsumerStoreLoad(SharedKafkaConsumer consumer, String storeName) {
void decreaseConsumerStoreLoad(SharedKafkaConsumer consumer, PubSubTopic versionTopic) {
/**
* When versionTopic is null, it means a specific Topic-Partition has been unsubscribed for more than 1 time. This
* can happen during version deprecation, where {@link ParticipantStoreConsumptionTask} is also trying to unsubscribe
* every partitions
*/
if (versionTopic == null) {
getLOGGER().warn(
"Incoming versionTopic is null, will skip decreasing store load for consumer: {} with index: {}",
consumer,
getConsumerToConsumptionTask().indexOf(consumer));
return;
}
String storeName = versionTopic.getStoreName();
if (!getConsumerToBaseLoadCount().containsKey(consumer)) {
throw new IllegalStateException("Consumer to base load count map does not contain consumer: " + consumer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private KafkaConsumerService getKafkaConsumerServiceWithSingleConsumer(
poolType,
factory,
properties,
1000l,
1000L,
1,
mockIngestionThrottler,
mock(KafkaClusterBasedRecordThrottler.class),
Expand Down Expand Up @@ -436,6 +436,8 @@ public void testStoreAwarePartitionWiseGetConsumer() {
PubSubTopic pubSubTopicForStoreName3 = pubSubTopicRepository.getTopic(topicForStoreName3);

String storeName4 = Utils.getUniqueString("test_consumer_service4");
String topicForStoreName4 = Version.composeKafkaTopic(storeName4, 1);
PubSubTopic pubSubTopicForStoreName4 = pubSubTopicRepository.getTopic(topicForStoreName4);

SharedKafkaConsumer consumer1 = mock(SharedKafkaConsumer.class);
SharedKafkaConsumer consumer2 = mock(SharedKafkaConsumer.class);
Expand All @@ -461,9 +463,9 @@ public void testStoreAwarePartitionWiseGetConsumer() {
when(consumerService.getLOGGER())
.thenReturn(LogManager.getLogger(StoreAwarePartitionWiseKafkaConsumerService.class));
doCallRealMethod().when(consumerService).pickConsumerForPartition(any(), any());
doCallRealMethod().when(consumerService).getConsumerStoreLoad(any(), anyString());
doCallRealMethod().when(consumerService).increaseConsumerStoreLoad(any(), anyString());
doCallRealMethod().when(consumerService).decreaseConsumerStoreLoad(any(), anyString());
doCallRealMethod().when(consumerService).getConsumerStoreLoad(any(), any());
doCallRealMethod().when(consumerService).increaseConsumerStoreLoad(any(), any());
doCallRealMethod().when(consumerService).decreaseConsumerStoreLoad(any(), any());

consumerToBasicLoadMap.put(consumer1, 1);
Map<String, Integer> innerMap1 = new VeniceConcurrentHashMap<>();
Expand Down Expand Up @@ -508,25 +510,28 @@ public void testStoreAwarePartitionWiseGetConsumer() {
Assert.assertEquals(consumerService.getConsumerStoreLoad(consumer1, storeName3), 10003);

// Validate decrease consumer entry
Assert.assertThrows(() -> consumerService.decreaseConsumerStoreLoad(consumer1, storeName4));
Assert.assertThrows(() -> consumerService.decreaseConsumerStoreLoad(consumer1, pubSubTopicForStoreName4));

consumerService.decreaseConsumerStoreLoad(consumer1, storeName1);
consumerService.decreaseConsumerStoreLoad(consumer1, pubSubTopicForStoreName1);
Assert.assertEquals(consumerToBasicLoadMap.get(consumer1).intValue(), 2);
Assert.assertNull(consumerToStoreLoadMap.get(consumer1).get(storeName1));
Assert.assertEquals(consumerService.getConsumerStoreLoad(consumer1, storeName1), 2);
Assert.assertThrows(() -> consumerService.decreaseConsumerStoreLoad(consumer1, storeName1));
Assert.assertThrows(() -> consumerService.decreaseConsumerStoreLoad(consumer1, pubSubTopicForStoreName1));

consumerService.decreaseConsumerStoreLoad(consumer1, storeName2);
consumerService.decreaseConsumerStoreLoad(consumer1, pubSubTopicForStoreName2);
Assert.assertEquals(consumerToBasicLoadMap.get(consumer1).intValue(), 1);
Assert.assertNull(consumerToStoreLoadMap.get(consumer1).get(storeName2));
Assert.assertEquals(consumerService.getConsumerStoreLoad(consumer1, storeName2), 1);
Assert.assertThrows(() -> consumerService.decreaseConsumerStoreLoad(consumer1, storeName2));
Assert.assertThrows(() -> consumerService.decreaseConsumerStoreLoad(consumer1, pubSubTopicForStoreName2));

consumerService.decreaseConsumerStoreLoad(consumer1, storeName3);
consumerService.decreaseConsumerStoreLoad(consumer1, pubSubTopicForStoreName3);
Assert.assertNull(consumerToBasicLoadMap.get(consumer1));
Assert.assertNull(consumerToStoreLoadMap.get(consumer1));
Assert.assertEquals(consumerService.getConsumerStoreLoad(consumer1, storeName3), 0);
Assert.assertThrows(() -> consumerService.decreaseConsumerStoreLoad(consumer1, storeName3));
Assert.assertThrows(() -> consumerService.decreaseConsumerStoreLoad(consumer1, pubSubTopicForStoreName3));

// Make sure invalid versionTopic won't throw NPE.
consumerService.decreaseConsumerStoreLoad(consumer1, null);

// Validate increase consumer entry
consumerService.increaseConsumerStoreLoad(consumer1, storeName1);
Expand Down

0 comments on commit c34eb10

Please sign in to comment.