diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/DictionaryUtils.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/DictionaryUtils.java index eb9d79584b..d95689807d 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/utils/DictionaryUtils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/utils/DictionaryUtils.java @@ -54,7 +54,7 @@ public static ByteBuffer readDictionaryFromKafka(String topicName, VenicePropert * @return The compression dictionary wrapped in a ByteBuffer, or null if no dictionary was present in the * Start Of Push message. */ - public static ByteBuffer readDictionaryFromKafka( + static ByteBuffer readDictionaryFromKafka( String topicName, PubSubConsumerAdapter pubSubConsumer, PubSubTopicRepository pubSubTopicRepository) { @@ -70,6 +70,11 @@ public static ByteBuffer readDictionaryFromKafka( while (!startOfPushReceived) { Map>> messages = pubSubConsumer.poll(10 * Time.MS_PER_SECOND); + + if (!messages.containsKey(pubSubTopicPartition)) { + continue; + } + for (final PubSubMessage message: messages.get(pubSubTopicPartition)) { kafkaKey = message.getKey(); kafkaValue = message.getValue(); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/utils/DictionaryUtilsTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/utils/DictionaryUtilsTest.java new file mode 100644 index 0000000000..55b2846a9f --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/utils/DictionaryUtilsTest.java @@ -0,0 +1,159 @@ +package com.linkedin.venice.utils; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.linkedin.venice.compression.CompressionStrategy; +import com.linkedin.venice.kafka.protocol.ControlMessage; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.kafka.protocol.Put; +import com.linkedin.venice.kafka.protocol.StartOfPush; +import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; +import com.linkedin.venice.kafka.protocol.enums.MessageType; +import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pubsub.ImmutablePubSubMessage; +import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; +import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; +import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubTopic; +import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import java.nio.ByteBuffer; +import java.util.Collections; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class DictionaryUtilsTest { + private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); + + private PubSubTopic getTopic() { + String callingFunction = Thread.currentThread().getStackTrace()[2].getMethodName(); + return pubSubTopicRepository.getTopic(Version.composeKafkaTopic(Utils.getUniqueString(callingFunction), 1)); + } + + @Test + public void testGetDictionary() { + PubSubTopic topic = getTopic(); + byte[] dictionaryToSend = "TEST_DICT".getBytes(); + + PubSubConsumerAdapter pubSubConsumer = mock(PubSubConsumerAdapter.class); + PubSubTopicRepository pubSubTopicRepository = mock(PubSubTopicRepository.class); + PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(topic, 0); + doReturn(topic).when(pubSubTopicRepository).getTopic(topic.getName()); + + KafkaKey controlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, null); + StartOfPush startOfPush = new StartOfPush(); + startOfPush.compressionStrategy = CompressionStrategy.ZSTD_WITH_DICT.getValue(); + startOfPush.compressionDictionary = ByteBuffer.wrap(dictionaryToSend); + + ControlMessage sopCM = new ControlMessage(); + sopCM.controlMessageType = ControlMessageType.START_OF_PUSH.getValue(); + sopCM.controlMessageUnion = startOfPush; + KafkaMessageEnvelope sopWithDictionaryValue = + new KafkaMessageEnvelope(MessageType.CONTROL_MESSAGE.getValue(), null, sopCM, null); + PubSubMessage sopWithDictionary = + new ImmutablePubSubMessage<>(controlMessageKey, sopWithDictionaryValue, topicPartition, 0L, 0L, 0); + doReturn(Collections.singletonMap(topicPartition, Collections.singletonList(sopWithDictionary))) + .when(pubSubConsumer) + .poll(anyLong()); + + ByteBuffer dictionaryFromKafka = + DictionaryUtils.readDictionaryFromKafka(topic.getName(), pubSubConsumer, pubSubTopicRepository); + Assert.assertEquals(dictionaryFromKafka.array(), dictionaryToSend); + verify(pubSubConsumer, times(1)).poll(anyLong()); + } + + @Test + public void testGetDictionaryReturnsNullWhenNoDictionary() { + PubSubTopic topic = getTopic(); + + PubSubConsumerAdapter pubSubConsumer = mock(PubSubConsumerAdapter.class); + PubSubTopicRepository pubSubTopicRepository = mock(PubSubTopicRepository.class); + PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(topic, 0); + doReturn(topic).when(pubSubTopicRepository).getTopic(topic.getName()); + + KafkaKey controlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, null); + StartOfPush startOfPush = new StartOfPush(); + + ControlMessage sopCM = new ControlMessage(); + sopCM.controlMessageType = ControlMessageType.START_OF_PUSH.getValue(); + sopCM.controlMessageUnion = startOfPush; + KafkaMessageEnvelope sopWithDictionaryValue = + new KafkaMessageEnvelope(MessageType.CONTROL_MESSAGE.getValue(), null, sopCM, null); + PubSubMessage sopWithDictionary = + new ImmutablePubSubMessage<>(controlMessageKey, sopWithDictionaryValue, topicPartition, 0L, 0L, 0); + doReturn(Collections.singletonMap(topicPartition, Collections.singletonList(sopWithDictionary))) + .when(pubSubConsumer) + .poll(anyLong()); + + ByteBuffer dictionaryFromKafka = + DictionaryUtils.readDictionaryFromKafka(topic.getName(), pubSubConsumer, pubSubTopicRepository); + Assert.assertNull(dictionaryFromKafka); + verify(pubSubConsumer, times(1)).poll(anyLong()); + } + + @Test + public void testGetDictionaryReturnsNullWhenNoSOP() { + PubSubTopic topic = getTopic(); + + PubSubConsumerAdapter pubSubConsumer = mock(PubSubConsumerAdapter.class); + PubSubTopicRepository pubSubTopicRepository = mock(PubSubTopicRepository.class); + PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(topic, 0); + doReturn(topic).when(pubSubTopicRepository).getTopic(topic.getName()); + + KafkaKey dataMessageKey = new KafkaKey(MessageType.PUT, "blah".getBytes()); + + Put putMessage = new Put(); + putMessage.putValue = ByteBuffer.wrap("blah".getBytes()); + putMessage.schemaId = 1; + KafkaMessageEnvelope putMessageValue = new KafkaMessageEnvelope(MessageType.PUT.getValue(), null, putMessage, null); + PubSubMessage sopWithDictionary = + new ImmutablePubSubMessage<>(dataMessageKey, putMessageValue, topicPartition, 0L, 0L, 0); + doReturn(Collections.singletonMap(topicPartition, Collections.singletonList(sopWithDictionary))) + .when(pubSubConsumer) + .poll(anyLong()); + + ByteBuffer dictionaryFromKafka = + DictionaryUtils.readDictionaryFromKafka(topic.getName(), pubSubConsumer, pubSubTopicRepository); + Assert.assertNull(dictionaryFromKafka); + verify(pubSubConsumer, times(1)).poll(anyLong()); + } + + @Test + public void testGetDictionaryWaitsTillTopicHasRecords() { + PubSubTopic topic = getTopic(); + byte[] dictionaryToSend = "TEST_DICT".getBytes(); + + PubSubConsumerAdapter pubSubConsumer = mock(PubSubConsumerAdapter.class); + PubSubTopicRepository pubSubTopicRepository = mock(PubSubTopicRepository.class); + PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(topic, 0); + doReturn(topic).when(pubSubTopicRepository).getTopic(topic.getName()); + + KafkaKey controlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, null); + StartOfPush startOfPush = new StartOfPush(); + startOfPush.compressionStrategy = CompressionStrategy.ZSTD_WITH_DICT.getValue(); + startOfPush.compressionDictionary = ByteBuffer.wrap(dictionaryToSend); + + ControlMessage sopCM = new ControlMessage(); + sopCM.controlMessageType = ControlMessageType.START_OF_PUSH.getValue(); + sopCM.controlMessageUnion = startOfPush; + KafkaMessageEnvelope sopWithDictionaryValue = + new KafkaMessageEnvelope(MessageType.CONTROL_MESSAGE.getValue(), null, sopCM, null); + PubSubMessage sopWithDictionary = + new ImmutablePubSubMessage<>(controlMessageKey, sopWithDictionaryValue, topicPartition, 0L, 0L, 0); + doReturn(Collections.emptyMap()) + .doReturn(Collections.singletonMap(topicPartition, Collections.singletonList(sopWithDictionary))) + .when(pubSubConsumer) + .poll(anyLong()); + + ByteBuffer dictionaryFromKafka = + DictionaryUtils.readDictionaryFromKafka(topic.getName(), pubSubConsumer, pubSubTopicRepository); + Assert.assertEquals(dictionaryFromKafka.array(), dictionaryToSend); + verify(pubSubConsumer, times(2)).poll(anyLong()); + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/TestDictionaryUtils.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/TestDictionaryUtils.java deleted file mode 100644 index 620e5d2f4a..0000000000 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/TestDictionaryUtils.java +++ /dev/null @@ -1,148 +0,0 @@ -package com.linkedin.venice.utils; - -import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.STANDALONE_REGION_NAME; -import static com.linkedin.venice.kafka.TopicManager.DEFAULT_KAFKA_OPERATION_TIMEOUT_MS; - -import com.linkedin.venice.ConfigKeys; -import com.linkedin.venice.compression.CompressionStrategy; -import com.linkedin.venice.integration.utils.PubSubBrokerConfigs; -import com.linkedin.venice.integration.utils.PubSubBrokerWrapper; -import com.linkedin.venice.integration.utils.ServiceFactory; -import com.linkedin.venice.kafka.TopicManager; -import com.linkedin.venice.kafka.protocol.enums.MessageType; -import com.linkedin.venice.message.KafkaKey; -import com.linkedin.venice.meta.Version; -import com.linkedin.venice.partitioner.DefaultVenicePartitioner; -import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; -import com.linkedin.venice.pubsub.PubSubTopicRepository; -import com.linkedin.venice.pubsub.api.PubSubTopic; -import com.linkedin.venice.writer.VeniceWriter; -import com.linkedin.venice.writer.VeniceWriterOptions; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Optional; -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - - -public class TestDictionaryUtils { - private static final long MIN_COMPACTION_LAG = 24 * Time.MS_PER_HOUR; - - /** Wait time for {@link #manager} operations, in seconds */ - private static final int WAIT_TIME = 10; - private static final int PARTITION_COUNT = 1; - private PubSubBrokerWrapper pubSubBrokerWrapper; - private TopicManager manager; - private TestMockTime mockTime; - private PubSubProducerAdapterFactory pubSubProducerAdapterFactory; - private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); - - private String getTopic() { - String callingFunction = Thread.currentThread().getStackTrace()[2].getMethodName(); - PubSubTopic pubSubTopic = - pubSubTopicRepository.getTopic(Version.composeKafkaTopic(Utils.getUniqueString(callingFunction), 1)); - int replicas = 1; - manager.createTopic(pubSubTopic, PARTITION_COUNT, replicas, false); - TestUtils.waitForNonDeterministicAssertion( - WAIT_TIME, - TimeUnit.SECONDS, - () -> Assert.assertTrue(manager.containsTopicAndAllPartitionsAreOnline(pubSubTopic))); - return pubSubTopic.getName(); - } - - private Properties getKafkaProperties() { - Properties props = new Properties(); - props.put(ConfigKeys.KAFKA_BOOTSTRAP_SERVERS, manager.getPubSubBootstrapServers()); - props.put(ConfigKeys.PARTITIONER_CLASS, DefaultVenicePartitioner.class.getName()); - return props; - } - - @BeforeClass - public void setUp() { - mockTime = new TestMockTime(); - pubSubBrokerWrapper = ServiceFactory.getPubSubBroker( - new PubSubBrokerConfigs.Builder().setMockTime(mockTime).setRegionName(STANDALONE_REGION_NAME).build()); - manager = - IntegrationTestPushUtils - .getTopicManagerRepo( - DEFAULT_KAFKA_OPERATION_TIMEOUT_MS, - 100, - MIN_COMPACTION_LAG, - pubSubBrokerWrapper, - pubSubTopicRepository) - .getTopicManager(); - pubSubProducerAdapterFactory = pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory(); - } - - @AfterClass - public void cleanUp() throws IOException { - pubSubBrokerWrapper.close(); - manager.close(); - } - - @Test - public void testGetDictionary() { - String topic = getTopic(); - byte[] dictionaryToSend = "TEST_DICT".getBytes(); - Properties props = getKafkaProperties(); - - try (VeniceWriter veniceWriter = - TestUtils.getVeniceWriterFactory(props, pubSubProducerAdapterFactory) - .createVeniceWriter( - new VeniceWriterOptions.Builder(topic).setUseKafkaKeySerializer(true) - .setPartitionCount(PARTITION_COUNT) - .build())) { - veniceWriter.broadcastStartOfPush( - true, - false, - CompressionStrategy.ZSTD_WITH_DICT, - Optional.of(ByteBuffer.wrap(dictionaryToSend)), - null); - veniceWriter.broadcastEndOfPush(null); - } - - ByteBuffer dictionaryFromKafka = DictionaryUtils.readDictionaryFromKafka(topic, new VeniceProperties(props)); - Assert.assertEquals(dictionaryFromKafka.array(), dictionaryToSend); - } - - @Test - public void testGetDictionaryReturnsNullWhenNoDictionary() { - String topic = getTopic(); - Properties props = getKafkaProperties(); - - try (VeniceWriter veniceWriter = - TestUtils.getVeniceWriterFactory(props, pubSubProducerAdapterFactory) - .createVeniceWriter( - new VeniceWriterOptions.Builder(topic).setUseKafkaKeySerializer(true) - .setPartitionCount(PARTITION_COUNT) - .build())) { - veniceWriter.broadcastStartOfPush(true, false, CompressionStrategy.ZSTD_WITH_DICT, null); - veniceWriter.broadcastEndOfPush(null); - } - - ByteBuffer dictionaryFromKafka = DictionaryUtils.readDictionaryFromKafka(topic, new VeniceProperties(props)); - Assert.assertNull(dictionaryFromKafka); - } - - @Test - public void testGetDictionaryReturnsNullWhenNoSOP() { - String topic = getTopic(); - Properties props = getKafkaProperties(); - - try (VeniceWriter veniceWriter = - TestUtils.getVeniceWriterFactory(props, pubSubProducerAdapterFactory) - .createVeniceWriter( - new VeniceWriterOptions.Builder(topic).setUseKafkaKeySerializer(true) - .setPartitionCount(PARTITION_COUNT) - .build())) { - veniceWriter.put(new KafkaKey(MessageType.PUT, "blah".getBytes()), "blah".getBytes(), 1, null); - } - - ByteBuffer dictionaryFromKafka = DictionaryUtils.readDictionaryFromKafka(topic, new VeniceProperties(props)); - Assert.assertNull(dictionaryFromKafka); - } -}