From 4bddf071d2fa4bd9a6ce0cc9e9453eb60df69531 Mon Sep 17 00:00:00 2001 From: Virupaksha Swamy Uttanur Matada Date: Mon, 26 Aug 2019 11:27:59 -0700 Subject: [PATCH] Add kafka native record headers (#135) This commit introduces two new kafka native headers 1. "_t" -- which represents the timestamp at which the message was created by the producer. 2. "_lm" -- which represents that this message is part of large message. --- .gitignore | 1 + .../LargeMessageIntegrationTest.java | 45 +++--- .../LiKafkaProducerIntegrationTest.java | 9 ++ .../utils/LiKafkaClientsTestUtils.java | 26 ++++ .../common/LargeMessageHeaderValue.java | 86 +++++++++++ .../ConsumerRecordsProcessor.java | 24 +++- .../clients/largemessage/MessageSplitter.java | 19 ++- .../largemessage/MessageSplitterImpl.java | 40 +++++- .../producer/LiKafkaProducerConfig.java | 6 + .../clients/producer/LiKafkaProducerImpl.java | 24 +++- .../kafka/clients/utils/Constants.java | 18 +++ .../utils/PrimitiveEncoderDecoder.java | 133 ++++++++++++++++++ .../utils/PrimitiveEncoderDecoderTest.java | 65 +++++++++ 13 files changed, 466 insertions(+), 30 deletions(-) create mode 100644 li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/LargeMessageHeaderValue.java create mode 100644 li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/utils/Constants.java create mode 100644 li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/utils/PrimitiveEncoderDecoder.java create mode 100644 li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/utils/PrimitiveEncoderDecoderTest.java diff --git a/.gitignore b/.gitignore index f9ddbc7..1df5c31 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ #gradle .gradle/ build/ +.mario/ #idea .idea/ diff --git a/integration-tests/src/test/java/com/linkedin/kafka/clients/largemessage/LargeMessageIntegrationTest.java b/integration-tests/src/test/java/com/linkedin/kafka/clients/largemessage/LargeMessageIntegrationTest.java index 164fc26..f89d13b 100644 --- a/integration-tests/src/test/java/com/linkedin/kafka/clients/largemessage/LargeMessageIntegrationTest.java +++ b/integration-tests/src/test/java/com/linkedin/kafka/clients/largemessage/LargeMessageIntegrationTest.java @@ -4,17 +4,19 @@ package com.linkedin.kafka.clients.largemessage; +import com.linkedin.kafka.clients.common.LargeMessageHeaderValue; import com.linkedin.kafka.clients.consumer.LiKafkaConsumer; import com.linkedin.kafka.clients.producer.LiKafkaProducer; +import com.linkedin.kafka.clients.utils.Constants; +import com.linkedin.kafka.clients.utils.LiKafkaClientsTestUtils; import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils; +import com.linkedin.kafka.clients.utils.PrimitiveEncoderDecoder; import com.linkedin.kafka.clients.utils.tests.AbstractKafkaClientsIntegrationTestHarness; import com.linkedin.kafka.clients.utils.tests.KafkaTestUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -29,8 +31,10 @@ import java.util.Set; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertFalse; + /** * The integration test for large message. */ @@ -65,6 +69,7 @@ public void tearDown() { @Test public void testLargeMessage() { + long startTime = System.currentTimeMillis(); Properties props = new Properties(); props.setProperty("large.message.enabled", "true"); props.setProperty("max.message.segment.size", "200"); @@ -76,33 +81,29 @@ public void testLargeMessage() { /* The test will send 100 different large messages to broker, consume from broker and verify the message contents. Here for simplicity we use a large message segment as a large message, and chunk this */ - Map messages = new HashMap(); + Map messages = new HashMap<>(); int numberOfLargeMessages = 100; int largeMessageSize = 1000; - final Set ackedMessages = new HashSet(); + final Set ackedMessages = new HashSet<>(); // Produce large messages. for (int i = 0; i < numberOfLargeMessages; i++) { final String messageId = LiKafkaClientsUtils.randomUUID().toString().replace("-", ""); String message = messageId + KafkaTestUtils.getRandomString(largeMessageSize); messages.put(messageId, message); - largeMessageProducer.send(new ProducerRecord(TOPIC, message), - new Callback() { - @Override - public void onCompletion(RecordMetadata recordMetadata, Exception e) { - // The callback should have been invoked only once. - assertFalse(ackedMessages.contains(messageId)); - if (e == null) { - ackedMessages.add(messageId); - } - } - }); + largeMessageProducer.send(new ProducerRecord<>(TOPIC, message), (recordMetadata, e) -> { + // The callback should have been invoked only once. + assertFalse(ackedMessages.contains(messageId)); + if (e == null) { + ackedMessages.add(messageId); + } + }); } largeMessageProducer.close(); // All messages should have been sent. assertEquals(ackedMessages.size(), messages.size()); // Consume and verify the large messages - List partitions = new ArrayList(); + List partitions = new ArrayList<>(); for (int i = 0; i < NUM_PARTITIONS; i++) { partitions.add(new TopicPartition(TOPIC, i)); } @@ -116,6 +117,18 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { consumptionStarted = true; } for (ConsumerRecord consumerRecord : records) { + // Verify headers + Map headers = LiKafkaClientsTestUtils.fetchSpecialHeaders(consumerRecord.headers()); + assertTrue(headers.containsKey(Constants.TIMESTAMP_HEADER)); + assertEquals(PrimitiveEncoderDecoder.LONG_SIZE, headers.get(Constants.TIMESTAMP_HEADER).length); + long eventTimestamp = PrimitiveEncoderDecoder.decodeLong(headers.get(Constants.TIMESTAMP_HEADER), 0); + assertTrue(eventTimestamp >= startTime && eventTimestamp <= System.currentTimeMillis()); + assertTrue(headers.containsKey(Constants.LARGE_MESSAGE_HEADER)); + LargeMessageHeaderValue largeMessageHeaderValue = LargeMessageHeaderValue.fromBytes(headers.get(Constants.LARGE_MESSAGE_HEADER)); + assertEquals(largeMessageHeaderValue.getSegmentNumber(), -1); + assertEquals(largeMessageHeaderValue.getNumberOfSegments(), 6); + assertEquals(largeMessageHeaderValue.getType(), LargeMessageHeaderValue.LEGACY); + String messageId = consumerRecord.value().substring(0, 32); String origMessage = messages.get(messageId); assertEquals(consumerRecord.value(), origMessage, "Messages should be the same"); diff --git a/integration-tests/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaProducerIntegrationTest.java b/integration-tests/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaProducerIntegrationTest.java index bec8370..c7f5c37 100644 --- a/integration-tests/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaProducerIntegrationTest.java +++ b/integration-tests/src/test/java/com/linkedin/kafka/clients/producer/LiKafkaProducerIntegrationTest.java @@ -6,6 +6,9 @@ import com.linkedin.kafka.clients.consumer.LiKafkaConsumer; import com.linkedin.kafka.clients.largemessage.errors.SkippableException; +import com.linkedin.kafka.clients.utils.Constants; +import com.linkedin.kafka.clients.utils.LiKafkaClientsTestUtils; +import com.linkedin.kafka.clients.utils.PrimitiveEncoderDecoder; import com.linkedin.kafka.clients.utils.tests.AbstractKafkaClientsIntegrationTestHarness; import java.io.IOException; import java.util.BitSet; @@ -54,6 +57,7 @@ public void tearDown() { */ @Test public void testSend() throws IOException, InterruptedException { + long startTime = System.currentTimeMillis(); Properties props = new Properties(); props.setProperty(ProducerConfig.ACKS_CONFIG, "-1"); final String tempTopic = "testTopic" + new Random().nextInt(1000000); @@ -74,6 +78,11 @@ public void testSend() throws IOException, InterruptedException { while (messageCount < RECORD_COUNT && System.currentTimeMillis() < startMs + 30000) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { + Map headers = LiKafkaClientsTestUtils.fetchSpecialHeaders(record.headers()); + assertTrue(headers.containsKey(Constants.TIMESTAMP_HEADER)); + long eventTimestamp = PrimitiveEncoderDecoder.decodeLong(headers.get(Constants.TIMESTAMP_HEADER), 0); + assertTrue(eventTimestamp >= startTime && eventTimestamp <= System.currentTimeMillis()); + int index = Integer.parseInt(record.value()); counts.set(index); messageCount++; diff --git a/integration-tests/src/test/java/com/linkedin/kafka/clients/utils/LiKafkaClientsTestUtils.java b/integration-tests/src/test/java/com/linkedin/kafka/clients/utils/LiKafkaClientsTestUtils.java index 167e3e8..4546bd8 100644 --- a/integration-tests/src/test/java/com/linkedin/kafka/clients/utils/LiKafkaClientsTestUtils.java +++ b/integration-tests/src/test/java/com/linkedin/kafka/clients/utils/LiKafkaClientsTestUtils.java @@ -8,8 +8,12 @@ import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.Random; import java.util.UUID; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; import static org.testng.Assert.assertEquals; @@ -52,4 +56,26 @@ public static String getRandomString(int length) { } return stringBuiler.toString(); } + + /** + * Special header keys have a "_" prefix and are managed internally by the clients. + * @param headers + * @return + */ + public static Map fetchSpecialHeaders(Headers headers) { + Map map = new HashMap<>(); + for (Header header : headers) { + + if (!header.key().startsWith("_")) { + // skip any non special header + continue; + } + + if (map.containsKey(header.key())) { + throw new IllegalStateException("Duplicate special header found " + header.key()); + } + map.put(header.key(), header.value()); + } + return map; + } } diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/LargeMessageHeaderValue.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/LargeMessageHeaderValue.java new file mode 100644 index 0000000..98de436 --- /dev/null +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/common/LargeMessageHeaderValue.java @@ -0,0 +1,86 @@ +/* + * Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information. + */ + +package com.linkedin.kafka.clients.common; + +import com.linkedin.kafka.clients.utils.PrimitiveEncoderDecoder; +import java.util.UUID; + + +/** + * This class represents the header value for a large message. + * Every large message header takes up 25 bytes and is structured as follows + * + * | Type | UUID | segmentNumber | numberOfSegments | + * | 1 byte | 16 bytes | 4 bytes | 4 bytes | + * + * The Large message header values will be used to support large messages eventually. + * (as opposed to encoding large segment metadata info inside the payload) + */ +public class LargeMessageHeaderValue { + public static final UUID EMPTY_UUID = new UUID(0L, 0L); + public static final int INVALID_SEGMENT_ID = -1; + private final byte _type; + private final UUID _uuid; + private final int _segmentNumber; + private final int _numberOfSegments; + + // This indicates that the large message framework is using + // SegmentSerializer/SegmentDeserializer interface to split + // and assemble large message segments. + public static final byte LEGACY = (byte) 0; + + public LargeMessageHeaderValue(byte type, UUID uuid, int segmentNumber, int numberOfSegments) { + _type = type; + _uuid = uuid; + _segmentNumber = segmentNumber; + _numberOfSegments = numberOfSegments; + } + + public int getSegmentNumber() { + return _segmentNumber; + } + + public int getNumberOfSegments() { + return _numberOfSegments; + } + + public UUID getUuid() { + return _uuid; + } + + public byte getType() { + return _type; + } + + public static byte[] toBytes(LargeMessageHeaderValue largeMessageHeaderValue) { + byte[] serialized = new byte[25]; + int byteOffset = 0; + serialized[byteOffset] = largeMessageHeaderValue.getType(); + byteOffset += 1; // for type + PrimitiveEncoderDecoder.encodeLong(largeMessageHeaderValue.getUuid().getLeastSignificantBits(), serialized, byteOffset); + byteOffset += PrimitiveEncoderDecoder.LONG_SIZE; // for UUID(least significant bits) + PrimitiveEncoderDecoder.encodeLong(largeMessageHeaderValue.getUuid().getMostSignificantBits(), serialized, byteOffset); + byteOffset += PrimitiveEncoderDecoder.LONG_SIZE; // for UUID(most significant bits) + PrimitiveEncoderDecoder.encodeInt(largeMessageHeaderValue.getSegmentNumber(), serialized, byteOffset); + byteOffset += PrimitiveEncoderDecoder.INT_SIZE; // for segment number + PrimitiveEncoderDecoder.encodeInt(largeMessageHeaderValue.getNumberOfSegments(), serialized, byteOffset); + return serialized; + } + + public static LargeMessageHeaderValue fromBytes(byte[] bytes) { + int byteOffset = 0; + + byte type = bytes[byteOffset]; + byteOffset += 1; + long leastSignificantBits = PrimitiveEncoderDecoder.decodeLong(bytes, byteOffset); + byteOffset += PrimitiveEncoderDecoder.LONG_SIZE; + long mostSignificantBits = PrimitiveEncoderDecoder.decodeLong(bytes, byteOffset); + byteOffset += PrimitiveEncoderDecoder.LONG_SIZE; + int segmentNumber = PrimitiveEncoderDecoder.decodeInt(bytes, byteOffset); + byteOffset += PrimitiveEncoderDecoder.INT_SIZE; + int numberOfSegments = PrimitiveEncoderDecoder.decodeInt(bytes, byteOffset); + return new LargeMessageHeaderValue(type, new UUID(mostSignificantBits, leastSignificantBits), segmentNumber, numberOfSegments); + } +} diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/ConsumerRecordsProcessor.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/ConsumerRecordsProcessor.java index 64618e6..cad580c 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/ConsumerRecordsProcessor.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/ConsumerRecordsProcessor.java @@ -6,7 +6,9 @@ import com.linkedin.kafka.clients.auditing.AuditType; import com.linkedin.kafka.clients.auditing.Auditor; +import com.linkedin.kafka.clients.common.LargeMessageHeaderValue; import com.linkedin.kafka.clients.largemessage.errors.SkippableException; +import com.linkedin.kafka.clients.utils.Constants; import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils; import java.util.Collections; import java.util.Objects; @@ -17,6 +19,9 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Deserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -417,6 +422,21 @@ private ConsumerRecord handleConsumerRecord(ConsumerRecord } _partitionConsumerHighWatermark.computeIfAbsent(tp, _storedConsumerHighWatermark)._currentConsumerHighWatermark = consumerRecord.offset(); + // Create a new copy of the headers + Headers headers = new RecordHeaders(consumerRecord.headers()); + Header largeMessageHeader = headers.lastHeader(Constants.LARGE_MESSAGE_HEADER); + if (largeMessageHeader != null) { + LargeMessageHeaderValue largeMessageHeaderValue = LargeMessageHeaderValue.fromBytes(largeMessageHeader.value()); + // Once the large message header value is parsed, remove any such key from record headers + headers.remove(Constants.LARGE_MESSAGE_HEADER); + largeMessageHeaderValue = new LargeMessageHeaderValue( + largeMessageHeaderValue.getType(), + LargeMessageHeaderValue.EMPTY_UUID, + LargeMessageHeaderValue.INVALID_SEGMENT_ID, + largeMessageHeaderValue.getNumberOfSegments() + ); + headers.add(Constants.LARGE_MESSAGE_HEADER, LargeMessageHeaderValue.toBytes(largeMessageHeaderValue)); + } handledRecord = new ConsumerRecord<>( consumerRecord.topic(), @@ -428,7 +448,9 @@ private ConsumerRecord handleConsumerRecord(ConsumerRecord consumerRecord.serializedKeySize(), valueBytes == null ? 0 : valueBytes.length, _keyDeserializer.deserialize(consumerRecord.topic(), consumerRecord.key()), - value); + value, + headers + ); } return handledRecord; } diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/MessageSplitter.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/MessageSplitter.java index 7c6f28d..9cea30a 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/MessageSplitter.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/MessageSplitter.java @@ -8,6 +8,8 @@ import java.util.List; import java.util.UUID; +import org.apache.kafka.common.header.Headers; + /** * Message splitter for large messages @@ -84,6 +86,10 @@ List> split(String topic, * this large message. * @param key The key associated with the message. * @param serializedRecord the serialized bytes of large message to split + * @param headers headers for the producer record. + * If the header is null, a new record headers is created and large message specific values are added. + * If the header is not null, any old large message keys are removed and new values as generated by the + * current #split() call will be added. * @return A list of IndexedRecord each contains a chunk of the original large message. */ List> split(String topic, @@ -91,7 +97,8 @@ List> split(String topic, Long timestamp, UUID messageId, byte[] key, - byte[] serializedRecord); + byte[] serializedRecord, + Headers headers); /** * Split the large message into several {@link org.apache.kafka.clients.producer.ProducerRecord} @@ -106,6 +113,10 @@ List> split(String topic, * @param key The key associated with the message. * @param serializedRecord the serialized bytes of large message to split * @param maxSegmentSize the max segment size to use to split the message + * @param headers headers for the producer record. +` * If the header is null, a new record headers is created and large message specific values are added. + * If the header is not null, any old large message keys are removed and new values as generated by the + * current #split() call will be added. * @return A list of IndexedRecord each contains a chunk of the original large message. */ List> split(String topic, @@ -114,7 +125,7 @@ List> split(String topic, UUID messageId, byte[] key, byte[] serializedRecord, - int maxSegmentSize); - -} + int maxSegmentSize, + Headers headers); +} \ No newline at end of file diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/MessageSplitterImpl.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/MessageSplitterImpl.java index b18d65b..f9f1fd7 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/MessageSplitterImpl.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/MessageSplitterImpl.java @@ -4,10 +4,14 @@ package com.linkedin.kafka.clients.largemessage; +import com.linkedin.kafka.clients.common.LargeMessageHeaderValue; +import com.linkedin.kafka.clients.utils.Constants; import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils; import com.linkedin.kafka.clients.producer.UUIDFactory; import java.util.Collections; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Serializer; import java.nio.ByteBuffer; @@ -49,13 +53,15 @@ public List> split(String topic, Integer partitio } @Override + @Deprecated public List> split(String topic, Integer partition, UUID messageId, byte[] key, byte[] serializedRecord) { - return split(topic, partition, null, messageId, key, serializedRecord); + return split(topic, partition, null, messageId, key, serializedRecord, null); } @Override - public List> split(String topic, Integer partition, Long timestamp, UUID messageId, byte[] key, byte[] serializedRecord) { - return split(topic, partition, timestamp, messageId, key, serializedRecord, _maxSegmentSize); + public List> split(String topic, Integer partition, + Long timestamp, UUID messageId, byte[] key, byte[] serializedRecord, Headers headers) { + return split(topic, partition, timestamp, messageId, key, serializedRecord, _maxSegmentSize, headers); } @Override @@ -65,13 +71,19 @@ public List> split(String topic, UUID messageId, byte[] key, byte[] serializedRecord, - int maxSegmentSize) { + int maxSegmentSize, + Headers headers) { if (topic == null) { throw new IllegalArgumentException("Topic cannot be empty."); } if (serializedRecord == null || serializedRecord.length == 0) { return Collections.singletonList(new ProducerRecord<>(topic, partition, timestamp, key, serializedRecord)); } + if (headers == null) { + // If null, create a new RecordHeaders + headers = new RecordHeaders(); + } + // We allow message id to be null, but it is strongly recommended to pass in a message id. UUID segmentMessageId = messageId == null ? _uuidFactory.createUuid() : messageId; List> segments = new ArrayList<>(); @@ -96,8 +108,26 @@ public List> split(String topic, // NOTE: Even though we are passing topic here to serialize, the segment itself should be topic independent. byte[] segmentValue = _segmentSerializer.serialize(topic, segment); + + // Make a temporary copy of headers because we'd be overwriting {@link Constants.LARGE_MESSAGE_HEADER} + Headers temporaryHeaders = new RecordHeaders(headers); + temporaryHeaders.remove(Constants.LARGE_MESSAGE_HEADER); + LargeMessageHeaderValue largeMessageHeaderValue = new LargeMessageHeaderValue( + LargeMessageHeaderValue.LEGACY, + messageId, + seq, + numberOfSegments + ); + temporaryHeaders.add(Constants.LARGE_MESSAGE_HEADER, LargeMessageHeaderValue.toBytes(largeMessageHeaderValue)); ProducerRecord segmentProducerRecord = - new ProducerRecord<>(topic, partition, timestamp, segmentKey, segmentValue); + new ProducerRecord<>( + topic, + partition, + timestamp, + segmentKey, + segmentValue, + temporaryHeaders + ); segments.add(segmentProducerRecord); } diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaProducerConfig.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaProducerConfig.java index 1024463..dd212c9 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaProducerConfig.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaProducerConfig.java @@ -32,6 +32,7 @@ public class LiKafkaProducerConfig extends AbstractConfig { private static final ConfigDef CONFIG; public static final String LARGE_MESSAGE_ENABLED_CONFIG = "large.message.enabled"; + public static final String HEADER_TIMESTAMP_ENABLED = "header.timestamp.enabled"; public static final String MAX_MESSAGE_SEGMENT_BYTES_CONFIG = "max.message.segment.bytes"; public static final String AUDITOR_CLASS_CONFIG = "auditor.class"; public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; @@ -53,6 +54,10 @@ public class LiKafkaProducerConfig extends AbstractConfig { "If large message is enabled, the producer will split the messages whose size is greater than " + "max.message.segment.bytes to multiple smaller messages and send them separately."; + public static final String HEADER_TIMESTAMP_ENABLED_DOC = "Configure the producer to stamp event time in the headers. " + + "Note that, any previously stamped values will be overwritten. " + + "If enabled, the new timestamp in the header will be equal to ProducerRecord.timestamp() if non-null " + + "otherwise, System.currentTimeInMillis() will be used."; public static final String MAX_MESSAGE_SEGMENT_BYTES_DOC = "The maximum size of a large message segment. " + "This configuration is also used as the threshold of the definition of large messages, i.e. " + "the producer will only split the messages whose size is greater the maximum allowed segment bytes. " + @@ -91,6 +96,7 @@ public class LiKafkaProducerConfig extends AbstractConfig { // TODO: Add a default metadata service client class. CONFIG = new ConfigDef() .define(LARGE_MESSAGE_ENABLED_CONFIG, Type.BOOLEAN, "false", Importance.MEDIUM, LARGE_MESSAGE_ENABLED_DOC) + .define(HEADER_TIMESTAMP_ENABLED, Type.BOOLEAN, "true", Importance.MEDIUM, HEADER_TIMESTAMP_ENABLED_DOC) .define(MAX_MESSAGE_SEGMENT_BYTES_CONFIG, Type.INT, "800000", Importance.MEDIUM, MAX_MESSAGE_SEGMENT_BYTES_DOC) .define(AUDITOR_CLASS_CONFIG, Type.CLASS, NoOpAuditor.class.getName(), Importance.MEDIUM, AUDITOR_CLASS_DOC) .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, ByteArraySerializer.class.getName(), Importance.MEDIUM, KEY_SERIALIZER_CLASS_DOC) diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaProducerImpl.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaProducerImpl.java index 9e7b132..1ec8515 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaProducerImpl.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaProducerImpl.java @@ -11,6 +11,8 @@ import com.linkedin.kafka.clients.largemessage.MessageSplitter; import com.linkedin.kafka.clients.largemessage.MessageSplitterImpl; import com.linkedin.kafka.clients.largemessage.errors.SkippableException; +import com.linkedin.kafka.clients.utils.Constants; +import com.linkedin.kafka.clients.utils.PrimitiveEncoderDecoder; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.List; @@ -35,6 +37,8 @@ import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Serializer; import org.slf4j.Logger; @@ -115,6 +119,7 @@ public class LiKafkaProducerImpl implements LiKafkaProducer { // Large message settings private final boolean _largeMessageEnabled; + private final boolean _enableHeaderTimestamps; private final int _maxMessageSegmentSize; private final MessageSplitter _messageSplitter; private final boolean _largeMessageSegmentWrappingRequired; @@ -178,7 +183,7 @@ private LiKafkaProducerImpl(LiKafkaProducerConfig configs, producerSupportsBoundedFlush = null; } _boundedFlushMethod = producerSupportsBoundedFlush; - + _enableHeaderTimestamps = configs.getBoolean(LiKafkaProducerConfig.HEADER_TIMESTAMP_ENABLED); try { // Instantiate the key serializer if necessary. _keySerializer = keySerializer != null ? keySerializer @@ -238,6 +243,17 @@ public Future send(ProducerRecord producerRecord, Callback Integer partition = producerRecord.partition(); Future future = null; UUID messageId = _uuidFactory.getUuid(producerRecord); + + Headers headers = producerRecord.headers(); + if (_enableHeaderTimestamps) { + if (headers == null) { + headers = new RecordHeaders(); + } + // Remove any header that maybe using the key for audit event timestamp or large message. + headers.remove(Constants.TIMESTAMP_HEADER); + headers.add(Constants.TIMESTAMP_HEADER, PrimitiveEncoderDecoder.encodeLong(timestamp)); + } + if (LOG.isTraceEnabled()) { LOG.trace("Sending event: [{}, {}] with key {} to kafka topic {}", messageId.toString().replaceAll("-", ""), @@ -266,7 +282,7 @@ public Future send(ProducerRecord producerRecord, Callback if (_largeMessageEnabled && serializedValueLength > _maxMessageSegmentSize) { // Split the payload into large message segments List> segmentRecords = - _messageSplitter.split(topic, partition, timestamp, messageId, serializedKey, serializedValue); + _messageSplitter.split(topic, partition, timestamp, messageId, serializedKey, serializedValue, headers); Callback largeMessageCallback = new LargeMessageCallback(segmentRecords.size(), errorLoggingCallback); for (ProducerRecord segmentRecord : segmentRecords) { future = _producer.send(segmentRecord, largeMessageCallback); @@ -275,14 +291,14 @@ public Future send(ProducerRecord producerRecord, Callback // Wrap the paylod with a large message segment, even if the payload is not big enough to split List> wrappedRecord = _messageSplitter.split(topic, partition, timestamp, messageId, serializedKey, serializedValue, - serializedValueLength); + serializedValueLength, headers); if (wrappedRecord.size() != 1) { throw new IllegalStateException("Failed to create a large message segment wrapped message"); } future = _producer.send(wrappedRecord.get(0), errorLoggingCallback); } else { // Do not wrap with a large message segment - future = _producer.send(new ProducerRecord(topic, partition, timestamp, serializedKey, serializedValue), + future = _producer.send(new ProducerRecord(topic, partition, timestamp, serializedKey, serializedValue, headers), errorLoggingCallback); } failed = false; diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/utils/Constants.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/utils/Constants.java new file mode 100644 index 0000000..9a29f9a --- /dev/null +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/utils/Constants.java @@ -0,0 +1,18 @@ +/* + * Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information. + */ + +package com.linkedin.kafka.clients.utils; + +public class Constants { + // The variables reserved by kafka for auditing purpose + public static final String TIMESTAMP_HEADER = "_t"; + public static final String LARGE_MESSAGE_HEADER = "_lm"; + + /** + * Avoid instantiating the constants class + */ + private Constants() { + + } +} diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/utils/PrimitiveEncoderDecoder.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/utils/PrimitiveEncoderDecoder.java new file mode 100644 index 0000000..4c8f640 --- /dev/null +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/utils/PrimitiveEncoderDecoder.java @@ -0,0 +1,133 @@ +/* + * Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information. + */ + +package com.linkedin.kafka.clients.utils; + +/** + * Class to encode/decode primitive integer/long types. + */ +public class PrimitiveEncoderDecoder { + // The number of bytes for a long variable + public static final int LONG_SIZE = Long.SIZE / Byte.SIZE; + public static final int INT_SIZE = Integer.SIZE / Byte.SIZE; + + /** + * Avoid instantiating PrimitiveEncoderDecoder class + */ + private PrimitiveEncoderDecoder() { + + } + + /** + * Encodes a long value into a {@link PrimitiveEncoderDecoder#LONG_SIZE} byte array + */ + public static void encodeLong(long value, byte[] output, int pos) { + if (output == null) { + throw new IllegalArgumentException("The input result cannot be null"); + } + + if (pos < 0) { + throw new IllegalArgumentException("position cannot be less than zero"); + } + + if (output.length < pos + LONG_SIZE) { + throw new IllegalArgumentException( + String.format("Not adequate bytes available to encode the long value(array length = %d, pos = %d", output.length, pos) + ); + } + + for (int i = LONG_SIZE - 1; i >= 0; i--) { + output[pos + i] = (byte) (value & 0xffL); + value >>= 8; + } + } + + /** + * Encodes a long value into a newly created byte[] and returns it. + */ + public static byte[] encodeLong(long value) { + byte[] data = new byte[LONG_SIZE]; + encodeLong(value, data, 0); + return data; + } + + /** + * Encodes a int value into a {@link PrimitiveEncoderDecoder#INT_SIZE} byte array + */ + public static void encodeInt(int value, byte[] output, int pos) { + if (output == null) { + throw new IllegalArgumentException("The input result cannot be null"); + } + + if (pos < 0) { + throw new IllegalArgumentException("position cannot be less than zero"); + } + + if (output.length < pos + INT_SIZE) { + throw new IllegalArgumentException( + String.format("Not adequate bytes available to encode the int value(array length = %d, pos = %d", output.length, pos) + ); + } + + output[pos] = (byte) (value >> 24); + output[pos + 1] = (byte) (value >> 16); + output[pos + 2] = (byte) (value >> 8); + output[pos + 3] = (byte) value; + } + + /** + * Encodes a int value int a newly created byte[] and returns it + */ + public static byte[] encodeInt(int value) { + byte[] data = new byte[INT_SIZE]; + encodeInt(value, data, 0); + return data; + } + + public static int decodeInt(byte[] input, int pos) { + if (input == null) { + throw new IllegalArgumentException("bytes cannot be null"); + } + + if (pos < 0) { + throw new IllegalArgumentException("position cannot be less than zero"); + } + + if (input.length < pos + INT_SIZE) { + throw new IllegalArgumentException( + String.format("Not adequate bytes available in the input array(array length = %d, pos = %d)", input.length, pos) + ); + } + + return input[pos] << 24 | (input[pos + 1] & 0xFF) << 16 | (input[pos + 2] & 0xFF) << 8 | (input[pos + 3] & 0xFF); + } + + /** + * Decodes {@link PrimitiveEncoderDecoder#LONG_SIZE} bytes from offset in the input byte array + */ + public static long decodeLong(byte[] input, int pos) { + if (input == null) { + throw new IllegalArgumentException("bytes cannot be null"); + } + + if (pos < 0) { + throw new IllegalArgumentException("position cannot be less than zero"); + } + + if (input.length < pos + LONG_SIZE) { + throw new IllegalArgumentException( + String.format("Not adequate bytes available in the input array(array length = %d, pos = %d)", input.length, pos) + ); + } + + return (input[pos] & 0xFFL) << 56 + | (input[pos + 1] & 0xFFL) << 48 + | (input[pos + 2] & 0xFFL) << 40 + | (input[pos + 3] & 0xFFL) << 32 + | (input[pos + 4] & 0xFFL) << 24 + | (input[pos + 5] & 0xFFL) << 16 + | (input[pos + 6] & 0xFFL) << 8 + | (input[pos + 7] & 0xFFL); + } +} \ No newline at end of file diff --git a/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/utils/PrimitiveEncoderDecoderTest.java b/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/utils/PrimitiveEncoderDecoderTest.java new file mode 100644 index 0000000..1f4a492 --- /dev/null +++ b/li-apache-kafka-clients/src/test/java/com/linkedin/kafka/clients/utils/PrimitiveEncoderDecoderTest.java @@ -0,0 +1,65 @@ +/* + * Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information. + */ + +package com.linkedin.kafka.clients.utils; + +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class PrimitiveEncoderDecoderTest { + private void verifyLong(long x) { + byte[] data = new byte[8]; + PrimitiveEncoderDecoder.encodeLong(x, data, 0); + Assert.assertEquals(data.length, 8); + Assert.assertEquals(PrimitiveEncoderDecoder.decodeLong(data, 0), x); + Assert.assertEquals(PrimitiveEncoderDecoder.decodeLong(PrimitiveEncoderDecoder.encodeLong(x), 0), x); + } + + private void verifyInt(int x) { + byte[] data = new byte[4]; + PrimitiveEncoderDecoder.encodeInt(x, data, 0); + Assert.assertEquals(data.length, 4); + Assert.assertEquals(PrimitiveEncoderDecoder.decodeInt(data, 0), x); + Assert.assertEquals(PrimitiveEncoderDecoder.decodeInt(PrimitiveEncoderDecoder.encodeInt(x), 0), x); + } + + @Test + public void testEncodeDecodeLong() { + verifyLong(Long.MIN_VALUE); + verifyLong(Long.MAX_VALUE); + verifyLong(-1L); + verifyLong(0L); + verifyLong(1L); + verifyLong(1000000000L); + verifyLong(-1000000000L); + verifyLong(System.currentTimeMillis()); + + verifyInt(Integer.MIN_VALUE); + verifyInt(Integer.MAX_VALUE); + verifyInt(-1); + verifyInt(0); + verifyInt(1); + verifyInt(1000000000); + verifyInt(-1000000000); + } + + @Test(expectedExceptions = {IllegalArgumentException.class}) + public void testInsufficientDataForDecodeLong() { + long value = 100; + byte[] serialized = PrimitiveEncoderDecoder.encodeLong(value); + Assert.assertNotNull(serialized); + Assert.assertEquals(serialized.length, 8); + Assert.assertNotEquals(PrimitiveEncoderDecoder.decodeLong(serialized, 1), value); + } + + @Test(expectedExceptions = {IllegalArgumentException.class}) + public void testInsufficientDataForDecodeInt() { + int value = 100; + byte[] serialized = PrimitiveEncoderDecoder.encodeInt(value); + Assert.assertNotNull(serialized); + Assert.assertEquals(serialized.length, 4); + Assert.assertNotEquals(PrimitiveEncoderDecoder.decodeLong(serialized, 1), value); + } +}