diff --git a/integration-tests/src/test/java/com/linkedin/kafka/clients/largemessage/LargeMessageIntegrationTest.java b/integration-tests/src/test/java/com/linkedin/kafka/clients/largemessage/LargeMessageIntegrationTest.java index 8c25be2..e6eb46f 100644 --- a/integration-tests/src/test/java/com/linkedin/kafka/clients/largemessage/LargeMessageIntegrationTest.java +++ b/integration-tests/src/test/java/com/linkedin/kafka/clients/largemessage/LargeMessageIntegrationTest.java @@ -34,6 +34,10 @@ import java.util.Properties; import java.util.Set; +import static com.linkedin.kafka.clients.producer.LiKafkaProducerConfig.LARGE_MESSAGE_ENABLED_CONFIG; +import static com.linkedin.kafka.clients.producer.LiKafkaProducerConfig.LARGE_MESSAGE_SEGMENT_WRAPPING_REQUIRED_CONFIG; +import static com.linkedin.kafka.clients.producer.LiKafkaProducerConfig.MAX_MESSAGE_SEGMENT_BYTES_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertFalse; @@ -71,6 +75,37 @@ public void tearDown() { super.tearDown(); } + @Test + public void testAlwaysUseLargeMessageEnvelope() throws Exception { + //create the test topic + try (AdminClient adminClient = createRawAdminClient(null)) { + adminClient.createTopics(Collections.singletonList(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1))).all().get(1, TimeUnit.MINUTES); + } + + { + long startTime = System.currentTimeMillis(); + Properties props = new Properties(); + props.setProperty(LARGE_MESSAGE_ENABLED_CONFIG, "true"); + props.setProperty(LARGE_MESSAGE_SEGMENT_WRAPPING_REQUIRED_CONFIG, "true"); + props.setProperty(MAX_MESSAGE_SEGMENT_BYTES_CONFIG, "200"); + props.setProperty(CLIENT_ID_CONFIG, "testProducer"); + LiKafkaProducer largeMessageProducer = createProducer(props); + + // This is how large we expect the final message to be, including the version byte, checksum, segment info and + // the user payload itself. + final int expectedProducedMessageSize = + + Byte.BYTES + + Integer.BYTES + + LargeMessageSegment.SEGMENT_INFO_OVERHEAD + + "hello".length(); + + largeMessageProducer.send(new ProducerRecord<>(TOPIC, "hello"), (recordMetadata, e) -> { + assertEquals(recordMetadata.serializedValueSize(), expectedProducedMessageSize); + }); + largeMessageProducer.close(); + } + } + @Test public void testLargeMessage() throws Exception { //create the test topic @@ -150,7 +185,7 @@ public void testLargeMessage() throws Exception { private static Properties buildConsumerProps() { Properties props = new Properties(); - props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testLargeMessageConsumer"); + props.setProperty(CLIENT_ID_CONFIG, "testLargeMessageConsumer"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testLargeMessageConsumer"); props.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/DefaultSegmentSerializer.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/DefaultSegmentSerializer.java index ec99897..94fd9e6 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/DefaultSegmentSerializer.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/DefaultSegmentSerializer.java @@ -30,21 +30,17 @@ public void configure(Map configs, boolean isKey) { @Override public byte[] serialize(String s, LargeMessageSegment segment) { - if (segment.numberOfSegments > 1) { - ByteBuffer byteBuffer = ByteBuffer.allocate(1 + LargeMessageSegment.SEGMENT_INFO_OVERHEAD + - segment.payload.limit() + CHECKSUM_LENGTH); - byteBuffer.put(LargeMessageSegment.CURRENT_VERSION); - byteBuffer.putInt((int) (segment.messageId.getMostSignificantBits() + segment.messageId.getLeastSignificantBits())); - byteBuffer.putLong(segment.messageId.getMostSignificantBits()); - byteBuffer.putLong(segment.messageId.getLeastSignificantBits()); - byteBuffer.putInt(segment.sequenceNumber); - byteBuffer.putInt(segment.numberOfSegments); - byteBuffer.putInt(segment.messageSizeInBytes); - byteBuffer.put(segment.payload); - return byteBuffer.array(); - } else { - return segment.payloadArray(); - } + ByteBuffer byteBuffer = ByteBuffer.allocate(1 + LargeMessageSegment.SEGMENT_INFO_OVERHEAD + + segment.payload.limit() + CHECKSUM_LENGTH); + byteBuffer.put(LargeMessageSegment.CURRENT_VERSION); + byteBuffer.putInt((int) (segment.messageId.getMostSignificantBits() + segment.messageId.getLeastSignificantBits())); + byteBuffer.putLong(segment.messageId.getMostSignificantBits()); + byteBuffer.putLong(segment.messageId.getLeastSignificantBits()); + byteBuffer.putInt(segment.sequenceNumber); + byteBuffer.putInt(segment.numberOfSegments); + byteBuffer.putInt(segment.messageSizeInBytes); + byteBuffer.put(segment.payload); + return byteBuffer.array(); } @Override diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaProducerConfig.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaProducerConfig.java index ebd5a1c..82738e3 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaProducerConfig.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaProducerConfig.java @@ -117,7 +117,7 @@ public class LiKafkaProducerConfig extends AbstractConfig { .define(CLUSTER_GROUP_CONFIG, Type.STRING, "", Importance.MEDIUM, CLUSTER_GROUP_DOC) .define(CLUSTER_ENVIRONMENT_CONFIG, Type.STRING, "", Importance.MEDIUM, CLUSTER_ENVIRONMENT_DOC) .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), Importance.MEDIUM, MAX_REQUEST_SIZE_DOC) - .define(LARGE_MESSAGE_SEGMENT_WRAPPING_REQUIRED_CONFIG, Type.BOOLEAN, "true", Importance.MEDIUM, + .define(LARGE_MESSAGE_SEGMENT_WRAPPING_REQUIRED_CONFIG, Type.BOOLEAN, "false", Importance.MEDIUM, LARGE_MESSAGE_SEGMENT_WRAPPING_REQUIRED_DOC) .define(METADATA_SERVICE_REQUEST_TIMEOUT_MS_CONFIG, Type.INT, Integer.MAX_VALUE, Importance.MEDIUM, METADATA_SERVICE_REQUEST_TIMEOUT_MS_DOC);