From 0ccdb01cc7b7c1a401af211abfc05acbc98ec05b Mon Sep 17 00:00:00 2001 From: "avinash.v" Date: Thu, 4 May 2023 14:06:50 +0530 Subject: [PATCH 1/2] Passing completeTopicName instead of topic name to create TopicMessageIdImpl in doCommitOffsets method --- .../org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java index 83fb86a..53b01fb 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java @@ -521,7 +521,8 @@ private CompletableFuture doCommitOffsets(Map Date: Sun, 7 May 2023 21:23:41 +0530 Subject: [PATCH 2/2] unit tests for testCommitSync --- pom.xml | 1 + .../consumer/PulsarKafkaConsumerTest.java | 147 ++++++++++++++++++ 2 files changed, 148 insertions(+) create mode 100644 pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java diff --git a/pom.xml b/pom.xml index 3527230..609e431 100644 --- a/pom.xml +++ b/pom.xml @@ -853,6 +853,7 @@ --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.util.concurrent=ALL-UNNAMED --add-opens java.base/sun.net=ALL-UNNAMED + --add-opens java.base/java.time=ALL-UNNAMED --add-opens java.management/sun.management=ALL-UNNAMED --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java new file mode 100644 index 0000000..ff254d6 --- /dev/null +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumerTest.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.TopicMessageIdImpl; +import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig; +import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig; +import org.apache.pulsar.client.util.MessageIdUtils; +import org.apache.pulsar.common.naming.TopicName; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.testng.IObjectFactory; +import org.testng.annotations.ObjectFactory; +import org.testng.annotations.Test; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; + +@PrepareForTest({PulsarClientKafkaConfig.class, PulsarConsumerKafkaConfig.class}) +@PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.kafka.clients.consumer.ConsumerInterceptor"}) +public class PulsarKafkaConsumerTest { + + @ObjectFactory + // Necessary to make PowerMockito.mockStatic work with TestNG. + public IObjectFactory getObjectFactory() { + return new org.powermock.modules.testng.PowerMockObjectFactory(); + } + + @Test + public void testCommitSync() throws PulsarClientException { + + String topic = "persistent://prop/ns/t1"; + + ClientBuilder mockClientBuilder = mock(ClientBuilder.class); + ConsumerBuilder mockConsumerBuilder = mock(ConsumerBuilder.class); + PulsarClientImpl mockPulsarClient = mock(PulsarClientImpl.class); + MultiTopicsConsumerImpl mockMultiTopicsConsumerImpl = mock(MultiTopicsConsumerImpl.class); + ConsumerImpl mockConsumerImpl = mock(ConsumerImpl.class); + + doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString()); + + PowerMockito.mockStatic(PulsarClientKafkaConfig.class); + PowerMockito.mockStatic(PulsarConsumerKafkaConfig.class); + + when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder); + when(PulsarConsumerKafkaConfig.getConsumerBuilder(any(), any())).thenReturn(mockConsumerBuilder); + doReturn(mockPulsarClient).when(mockClientBuilder).build(); + doReturn(mockConsumerBuilder).when(mockConsumerBuilder).clone(); + doReturn(mockConsumerBuilder).when(mockConsumerBuilder).topic(any()); + + + CompletableFuture voidFuture = new CompletableFuture<>(); + voidFuture.complete(null); + doReturn(voidFuture).when(mockMultiTopicsConsumerImpl).acknowledgeCumulativeAsync(any(MessageId.class)); + doReturn(voidFuture).when(mockConsumerImpl).acknowledgeCumulativeAsync(any(MessageId.class)); + + Properties properties = new Properties(); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Collections.singletonList("pulsar://localhost:6650")); + + List partitionCounts = Arrays.asList(0, 2); + + for (Integer count : partitionCounts) { + + CompletableFuture mockConsumerFuture = new CompletableFuture(); + mockConsumerFuture.complete(isTopicPartitioned(count) ? mockMultiTopicsConsumerImpl : mockConsumerImpl); + doReturn(mockConsumerFuture).when(mockConsumerBuilder).subscribeAsync(); + + CompletableFuture mockPartitionFuture = new CompletableFuture(); + mockPartitionFuture.complete(count); + doReturn(mockPartitionFuture).when(mockPulsarClient).getNumberOfPartitions(anyString()); + + Consumer consumer = spy(new PulsarKafkaConsumer<>(properties)); + + Map>> recordMap = getRecords(count, topic); + ConsumerRecords mockRecords = new ConsumerRecords<>(recordMap); + doReturn(mockRecords).when(consumer).poll(any()); + + consumer.subscribe(Collections.singletonList(topic)); + + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + Map offsets = new HashMap<>(); + records.partitions().forEach(partition -> { + List> partitionRecords = records.records(partition); + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + offsets.put(partition, new OffsetAndMetadata(lastOffset + 1)); + }); + consumer.commitSync(offsets); + + if (isTopicPartitioned(count)) { + for (Map.Entry>> record : recordMap.entrySet()) { + MessageId msgId = MessageIdUtils.getMessageId(record.getValue().get(0).offset() + 1); + String partitionName = TopicName.get(record.getKey().topic()).getPartition(record.getKey().partition()).toString(); + TopicMessageIdImpl topicMessageId = new TopicMessageIdImpl(partitionName, record.getKey().topic(), msgId); + verify(mockMultiTopicsConsumerImpl, times(1)).acknowledgeCumulativeAsync(topicMessageId); + } + } else { + verify(mockConsumerImpl, times(records.partitions().size())).acknowledgeCumulativeAsync(any(MessageId.class)); + } + } + } + + private boolean isTopicPartitioned(int partitionsCount) { + return partitionsCount > 0; + } + + private Map>> getRecords(int partitionsCount, String topic) { + Map>> recordMap = new HashMap<>(); + recordMap.put(new TopicPartition(topic, 0), Collections.singletonList(new ConsumerRecord<>(topic, 0, 10L, "1", "1"))); + for (int i = 1; i < partitionsCount; i++) { + recordMap.put(new TopicPartition(topic, 1), Collections.singletonList(new ConsumerRecord<>(topic, 1, 20L, "2", "1"))); + } + return recordMap; + } +}