diff --git a/integration-tests/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerIntegrationTest.java b/integration-tests/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerIntegrationTest.java index a6e3f35..5569bfd 100644 --- a/integration-tests/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerIntegrationTest.java +++ b/integration-tests/src/test/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerIntegrationTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.PassThroughConsumerRecord; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; @@ -56,26 +57,40 @@ public void tearDown() { super.tearDown(); } + @Test + public void testPassthroughConsumerReturnRecordType() throws Exception { + String topic = "testPassthroughConsumerReturnRecordType"; + createTopic(topic, 1); + Producer producer = createRawProducer(); + produceRawRecordToTopic(topic, producer); + + Properties extra = new Properties(); + extra.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + extra.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + extra.setProperty(ConsumerConfig.ENABLE_SHALLOW_ITERATOR_CONFIG, "true"); + Properties baseConsumerConfig = getConsumerProperties(extra); + LiKafkaInstrumentedConsumerImpl consumer = new LiKafkaInstrumentedConsumerImpl<>( + baseConsumerConfig, + null, + (baseConfig, overrideConfig) -> new LiKafkaConsumerImpl<>(LiKafkaClientsUtils.getConsolidatedProperties(baseConfig, overrideConfig)), + () -> "bob", + 1); + + consumer.subscribe(Collections.singletonList(topic)); + KafkaTestUtils.waitUntil("1st record batch", () -> { + ConsumerRecords recs = consumer.poll(Duration.ofSeconds(10)); + return recs.count() > 0 && recs.iterator().next() instanceof PassThroughConsumerRecord; + }, 1, 2, TimeUnit.MINUTES, false); + + consumer.close(Duration.ofSeconds(30)); + } + @Test public void testConsumerLiveConfigReload() throws Exception { String topic = "testConsumerLiveConfigReload"; createTopic(topic, 1); Producer producer = createRawProducer(); - for (int i = 0; i < 1000; i++) { - byte[] key = new byte[1024]; - byte[] value = new byte[1024]; - Arrays.fill(key, (byte) i); - Arrays.fill(value, (byte) i); - ProducerRecord record = new ProducerRecord<>(topic, 0, key, value); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - dos.writeInt(i); - dos.close(); - record.headers().add("recordNum", bos.toByteArray()); - producer.send(record); - } - producer.flush(); - producer.close(1, TimeUnit.MINUTES); + produceRawRecordToTopic(topic, producer); MarioConfiguration marioConfiguration = MarioConfiguration.embeddableInMem(); marioConfiguration.setEnableNgSupport(false); @@ -238,4 +253,22 @@ private void createTopic(String topicName, int numPartitions) throws Exception { adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, numPartitions, (short) 1))).all().get(1, TimeUnit.MINUTES); } } + + private void produceRawRecordToTopic(String topicName, Producer producer) throws Exception { + for (int i = 0; i < 1000; i++) { + byte[] key = new byte[1024]; + byte[] value = new byte[1024]; + Arrays.fill(key, (byte) i); + Arrays.fill(value, (byte) i); + ProducerRecord record = new ProducerRecord<>(topicName, 0, key, value); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + dos.writeInt(i); + dos.close(); + record.headers().add("recordNum", bos.toByteArray()); + producer.send(record); + } + producer.flush(); + producer.close(1, TimeUnit.MINUTES); + } } diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerConfig.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerConfig.java index e0144f6..d8f8a6e 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerConfig.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerConfig.java @@ -90,6 +90,10 @@ public class LiKafkaConsumerConfig extends AbstractConfig { private static final String AUDITOR_CLASS_DOC = "The auditor class to use for the consumer"; + private static final String ENABLE_SHALLOW_ITERATOR_DOC = "If true consumer will return the raw byte record to caller directly" + + " without performing large message processing. Shallow iterator means: Shallow iterate message batches in the consumer, " + + " returning message batches (potentially compressed) instead of individual records"; + private static final String ENABLE_AUTO_COMMIT_DOC = "If true the consumer's offset will be periodically committed in" + " the background."; @@ -169,6 +173,11 @@ public class LiKafkaConsumerConfig extends AbstractConfig { NoOpAuditor.class.getName(), Importance.LOW, AUDITOR_CLASS_DOC) + .define(ConsumerConfig.ENABLE_SHALLOW_ITERATOR_CONFIG, + Type.BOOLEAN, + "false", + Importance.LOW, + ENABLE_SHALLOW_ITERATOR_DOC) .define(ENABLE_AUTO_COMMIT_CONFIG, Type.BOOLEAN, "true", diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerImpl.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerImpl.java index 51797ba..f6d1610 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerImpl.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaConsumerImpl.java @@ -76,6 +76,7 @@ public class LiKafkaConsumerImpl implements LiKafkaConsumer { private final ConsumerRecordsProcessor _consumerRecordsProcessor; private final LiKafkaConsumerRebalanceListener _consumerRebalanceListener; private final LiKafkaOffsetCommitCallback _offsetCommitCallback; + private final boolean _passthroughEnabled; private final boolean _autoCommitEnabled; private final long _autoCommitInterval; private final boolean _throwExceptionOnInvalidOffsets; @@ -119,6 +120,7 @@ private LiKafkaConsumerImpl(LiKafkaConsumerConfig configs, _autoCommitEnabled = configs.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); _autoCommitInterval = configs.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); + _passthroughEnabled = configs.getBoolean(ConsumerConfig.ENABLE_SHALLOW_ITERATOR_CONFIG); _throwExceptionOnInvalidOffsets = configs.getBoolean(LiKafkaConsumerConfig.EXCEPTION_ON_INVALID_OFFSET_RESET_CONFIG); _offsetResetStrategy = LiOffsetResetStrategy.valueOf(configs.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT)); @@ -356,19 +358,27 @@ private ConsumerRecords poll(long timeout, boolean includeMetadataInTimeou } } - _lastProcessedResult = _consumerRecordsProcessor.process(rawRecords); - processedRecords = _lastProcessedResult.consumerRecords(); - // Clear the internal reference. - _lastProcessedResult.clearRecords(); - // Rewind offset if there are processing exceptions. - seekToCurrentOffsetsOnRecordProcessingExceptions(); - - // this is an optimization - // if no records were processed try to throw exception in current poll() - if (processedRecords.isEmpty()) { - crpe = handleRecordProcessingException(null); - if (crpe != null) { - throw crpe; + /* Hack for passthrough mirroring: since Brooklin is the only user of passthrough feature and they only + * use raw consumer, there's no need to perform large message processing in this case, + * and should just return the consumed records directly to the caller. + */ + if (_passthroughEnabled) { + processedRecords = (ConsumerRecords) rawRecords; + } else { + _lastProcessedResult = _consumerRecordsProcessor.process(rawRecords); + processedRecords = _lastProcessedResult.consumerRecords(); + // Clear the internal reference. + _lastProcessedResult.clearRecords(); + // Rewind offset if there are processing exceptions. + seekToCurrentOffsetsOnRecordProcessingExceptions(); + + // this is an optimization + // if no records were processed try to throw exception in current poll() + if (processedRecords.isEmpty()) { + crpe = handleRecordProcessingException(null); + if (crpe != null) { + throw crpe; + } } }