Skip to content

Commit

Permalink
[server] Fix NPE in StoreAwarePartitionWiseKafkaConsumerService#handl…
Browse files Browse the repository at this point in the history
…eUnsubscription
  • Loading branch information
sixpluszero committed Dec 17, 2024
1 parent ed335b8 commit 51105f1
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ public synchronized void batchUnsubscribe(Set<PubSubTopicPartition> pubSubTopicP
*/
protected synchronized void unSubscribeAction(Supplier<Set<PubSubTopicPartition>> supplier, long timeoutMs) {
long currentPollTimes = pollTimes;
Set<PubSubTopicPartition> topicPartitions = supplier.get();
long startTime = System.currentTimeMillis();
Set<PubSubTopicPartition> topicPartitions = supplier.get();
long elapsedTime = System.currentTimeMillis() - startTime;
LOGGER.info(
"Shared consumer {} unsubscribed {} partition(s): ({}) in {} ms",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void handleUnsubscription(
PubSubTopic versionTopic,
PubSubTopicPartition pubSubTopicPartition) {
super.handleUnsubscription(consumer, versionTopic, pubSubTopicPartition);
decreaseConsumerStoreLoad(consumer, versionTopic.getStoreName());
decreaseConsumerStoreLoad(consumer, versionTopic);
}

int getConsumerStoreLoad(SharedKafkaConsumer consumer, String storeName) {
Expand All @@ -138,7 +138,12 @@ void increaseConsumerStoreLoad(SharedKafkaConsumer consumer, String storeName) {
.compute(storeName, (k, v) -> (v == null) ? 1 : v + 1);
}

void decreaseConsumerStoreLoad(SharedKafkaConsumer consumer, String storeName) {
void decreaseConsumerStoreLoad(SharedKafkaConsumer consumer, PubSubTopic versionTopic) {
if (versionTopic == null) {
getLOGGER().warn("Incoming versionTopic is null, will skip decreasing store load for consumer: {}", consumer);
return;
}
String storeName = versionTopic.getStoreName();
if (!getConsumerToBaseLoadCount().containsKey(consumer)) {
throw new IllegalStateException("Consumer to base load count map does not contain consumer: " + consumer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private KafkaConsumerService getKafkaConsumerServiceWithSingleConsumer(
poolType,
factory,
properties,
1000l,
1000L,
1,
mockIngestionThrottler,
mock(KafkaClusterBasedRecordThrottler.class),
Expand Down Expand Up @@ -436,6 +436,8 @@ public void testStoreAwarePartitionWiseGetConsumer() {
PubSubTopic pubSubTopicForStoreName3 = pubSubTopicRepository.getTopic(topicForStoreName3);

String storeName4 = Utils.getUniqueString("test_consumer_service4");
String topicForStoreName4 = Version.composeKafkaTopic(storeName4, 1);
PubSubTopic pubSubTopicForStoreName4 = pubSubTopicRepository.getTopic(topicForStoreName4);

SharedKafkaConsumer consumer1 = mock(SharedKafkaConsumer.class);
SharedKafkaConsumer consumer2 = mock(SharedKafkaConsumer.class);
Expand All @@ -461,9 +463,9 @@ public void testStoreAwarePartitionWiseGetConsumer() {
when(consumerService.getLOGGER())
.thenReturn(LogManager.getLogger(StoreAwarePartitionWiseKafkaConsumerService.class));
doCallRealMethod().when(consumerService).pickConsumerForPartition(any(), any());
doCallRealMethod().when(consumerService).getConsumerStoreLoad(any(), anyString());
doCallRealMethod().when(consumerService).increaseConsumerStoreLoad(any(), anyString());
doCallRealMethod().when(consumerService).decreaseConsumerStoreLoad(any(), anyString());
doCallRealMethod().when(consumerService).getConsumerStoreLoad(any(), any());
doCallRealMethod().when(consumerService).increaseConsumerStoreLoad(any(), any());
doCallRealMethod().when(consumerService).decreaseConsumerStoreLoad(any(), any());

consumerToBasicLoadMap.put(consumer1, 1);
Map<String, Integer> innerMap1 = new VeniceConcurrentHashMap<>();
Expand Down Expand Up @@ -508,25 +510,28 @@ public void testStoreAwarePartitionWiseGetConsumer() {
Assert.assertEquals(consumerService.getConsumerStoreLoad(consumer1, storeName3), 10003);

// Validate decrease consumer entry
Assert.assertThrows(() -> consumerService.decreaseConsumerStoreLoad(consumer1, storeName4));
Assert.assertThrows(() -> consumerService.decreaseConsumerStoreLoad(consumer1, pubSubTopicForStoreName4));

consumerService.decreaseConsumerStoreLoad(consumer1, storeName1);
consumerService.decreaseConsumerStoreLoad(consumer1, pubSubTopicForStoreName1);
Assert.assertEquals(consumerToBasicLoadMap.get(consumer1).intValue(), 2);
Assert.assertNull(consumerToStoreLoadMap.get(consumer1).get(storeName1));
Assert.assertEquals(consumerService.getConsumerStoreLoad(consumer1, storeName1), 2);
Assert.assertThrows(() -> consumerService.decreaseConsumerStoreLoad(consumer1, storeName1));
Assert.assertThrows(() -> consumerService.decreaseConsumerStoreLoad(consumer1, pubSubTopicForStoreName1));

consumerService.decreaseConsumerStoreLoad(consumer1, storeName2);
consumerService.decreaseConsumerStoreLoad(consumer1, pubSubTopicForStoreName2);
Assert.assertEquals(consumerToBasicLoadMap.get(consumer1).intValue(), 1);
Assert.assertNull(consumerToStoreLoadMap.get(consumer1).get(storeName2));
Assert.assertEquals(consumerService.getConsumerStoreLoad(consumer1, storeName2), 1);
Assert.assertThrows(() -> consumerService.decreaseConsumerStoreLoad(consumer1, storeName2));
Assert.assertThrows(() -> consumerService.decreaseConsumerStoreLoad(consumer1, pubSubTopicForStoreName2));

consumerService.decreaseConsumerStoreLoad(consumer1, storeName3);
consumerService.decreaseConsumerStoreLoad(consumer1, pubSubTopicForStoreName3);
Assert.assertNull(consumerToBasicLoadMap.get(consumer1));
Assert.assertNull(consumerToStoreLoadMap.get(consumer1));
Assert.assertEquals(consumerService.getConsumerStoreLoad(consumer1, storeName3), 0);
Assert.assertThrows(() -> consumerService.decreaseConsumerStoreLoad(consumer1, storeName3));
Assert.assertThrows(() -> consumerService.decreaseConsumerStoreLoad(consumer1, pubSubTopicForStoreName3));

// Make sure invalid versionTopic won't throw NPE.
consumerService.decreaseConsumerStoreLoad(consumer1, null);

// Validate increase consumer entry
consumerService.increaseConsumerStoreLoad(consumer1, storeName1);
Expand Down

0 comments on commit 51105f1

Please sign in to comment.