Skip to content

Commit

Permalink
Bypass large message processing when passthrough is enabled for consu…
Browse files Browse the repository at this point in the history
…mer (#193)

Co-authored-by: Ke Hu <[email protected]>
  • Loading branch information
kehuum and Ke Hu authored Mar 3, 2021
1 parent a9db42e commit 5441f92
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.PassThroughConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -56,26 +57,40 @@ public void tearDown() {
super.tearDown();
}

@Test
public void testPassthroughConsumerReturnRecordType() throws Exception {
String topic = "testPassthroughConsumerReturnRecordType";
createTopic(topic, 1);
Producer<byte[], byte[]> producer = createRawProducer();
produceRawRecordToTopic(topic, producer);

Properties extra = new Properties();
extra.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
extra.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
extra.setProperty(ConsumerConfig.ENABLE_SHALLOW_ITERATOR_CONFIG, "true");
Properties baseConsumerConfig = getConsumerProperties(extra);
LiKafkaInstrumentedConsumerImpl<byte[], byte[]> consumer = new LiKafkaInstrumentedConsumerImpl<>(
baseConsumerConfig,
null,
(baseConfig, overrideConfig) -> new LiKafkaConsumerImpl<>(LiKafkaClientsUtils.getConsolidatedProperties(baseConfig, overrideConfig)),
() -> "bob",
1);

consumer.subscribe(Collections.singletonList(topic));
KafkaTestUtils.waitUntil("1st record batch", () -> {
ConsumerRecords<byte[], byte[]> recs = consumer.poll(Duration.ofSeconds(10));
return recs.count() > 0 && recs.iterator().next() instanceof PassThroughConsumerRecord;
}, 1, 2, TimeUnit.MINUTES, false);

consumer.close(Duration.ofSeconds(30));
}

@Test
public void testConsumerLiveConfigReload() throws Exception {
String topic = "testConsumerLiveConfigReload";
createTopic(topic, 1);
Producer<byte[], byte[]> producer = createRawProducer();
for (int i = 0; i < 1000; i++) {
byte[] key = new byte[1024];
byte[] value = new byte[1024];
Arrays.fill(key, (byte) i);
Arrays.fill(value, (byte) i);
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, 0, key, value);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
dos.writeInt(i);
dos.close();
record.headers().add("recordNum", bos.toByteArray());
producer.send(record);
}
producer.flush();
producer.close(1, TimeUnit.MINUTES);
produceRawRecordToTopic(topic, producer);

MarioConfiguration marioConfiguration = MarioConfiguration.embeddableInMem();
marioConfiguration.setEnableNgSupport(false);
Expand Down Expand Up @@ -238,4 +253,22 @@ private void createTopic(String topicName, int numPartitions) throws Exception {
adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, numPartitions, (short) 1))).all().get(1, TimeUnit.MINUTES);
}
}

private void produceRawRecordToTopic(String topicName, Producer<byte[], byte[]> producer) throws Exception {
for (int i = 0; i < 1000; i++) {
byte[] key = new byte[1024];
byte[] value = new byte[1024];
Arrays.fill(key, (byte) i);
Arrays.fill(value, (byte) i);
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topicName, 0, key, value);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
dos.writeInt(i);
dos.close();
record.headers().add("recordNum", bos.toByteArray());
producer.send(record);
}
producer.flush();
producer.close(1, TimeUnit.MINUTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public class LiKafkaConsumerConfig extends AbstractConfig {

private static final String AUDITOR_CLASS_DOC = "The auditor class to use for the consumer";

private static final String ENABLE_SHALLOW_ITERATOR_DOC = "If true consumer will return the raw byte record to caller directly"
+ " without performing large message processing. Shallow iterator means: Shallow iterate message batches in the consumer, "
+ " returning message batches (potentially compressed) instead of individual records";

private static final String ENABLE_AUTO_COMMIT_DOC = "If true the consumer's offset will be periodically committed in" +
" the background.";

Expand Down Expand Up @@ -169,6 +173,11 @@ public class LiKafkaConsumerConfig extends AbstractConfig {
NoOpAuditor.class.getName(),
Importance.LOW,
AUDITOR_CLASS_DOC)
.define(ConsumerConfig.ENABLE_SHALLOW_ITERATOR_CONFIG,
Type.BOOLEAN,
"false",
Importance.LOW,
ENABLE_SHALLOW_ITERATOR_DOC)
.define(ENABLE_AUTO_COMMIT_CONFIG,
Type.BOOLEAN,
"true",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class LiKafkaConsumerImpl<K, V> implements LiKafkaConsumer<K, V> {
private final ConsumerRecordsProcessor<K, V> _consumerRecordsProcessor;
private final LiKafkaConsumerRebalanceListener<K, V> _consumerRebalanceListener;
private final LiKafkaOffsetCommitCallback _offsetCommitCallback;
private final boolean _passthroughEnabled;
private final boolean _autoCommitEnabled;
private final long _autoCommitInterval;
private final boolean _throwExceptionOnInvalidOffsets;
Expand Down Expand Up @@ -119,6 +120,7 @@ private LiKafkaConsumerImpl(LiKafkaConsumerConfig configs,

_autoCommitEnabled = configs.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
_autoCommitInterval = configs.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
_passthroughEnabled = configs.getBoolean(ConsumerConfig.ENABLE_SHALLOW_ITERATOR_CONFIG);
_throwExceptionOnInvalidOffsets = configs.getBoolean(LiKafkaConsumerConfig.EXCEPTION_ON_INVALID_OFFSET_RESET_CONFIG);
_offsetResetStrategy =
LiOffsetResetStrategy.valueOf(configs.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
Expand Down Expand Up @@ -356,19 +358,27 @@ private ConsumerRecords<K, V> poll(long timeout, boolean includeMetadataInTimeou
}
}

_lastProcessedResult = _consumerRecordsProcessor.process(rawRecords);
processedRecords = _lastProcessedResult.consumerRecords();
// Clear the internal reference.
_lastProcessedResult.clearRecords();
// Rewind offset if there are processing exceptions.
seekToCurrentOffsetsOnRecordProcessingExceptions();

// this is an optimization
// if no records were processed try to throw exception in current poll()
if (processedRecords.isEmpty()) {
crpe = handleRecordProcessingException(null);
if (crpe != null) {
throw crpe;
/* Hack for passthrough mirroring: since Brooklin is the only user of passthrough feature and they only
* use raw <byte[], byte[]> consumer, there's no need to perform large message processing in this case,
* and should just return the consumed records directly to the caller.
*/
if (_passthroughEnabled) {
processedRecords = (ConsumerRecords<K, V>) rawRecords;
} else {
_lastProcessedResult = _consumerRecordsProcessor.process(rawRecords);
processedRecords = _lastProcessedResult.consumerRecords();
// Clear the internal reference.
_lastProcessedResult.clearRecords();
// Rewind offset if there are processing exceptions.
seekToCurrentOffsetsOnRecordProcessingExceptions();

// this is an optimization
// if no records were processed try to throw exception in current poll()
if (processedRecords.isEmpty()) {
crpe = handleRecordProcessingException(null);
if (crpe != null) {
throw crpe;
}
}
}

Expand Down

0 comments on commit 5441f92

Please sign in to comment.