Skip to content

Commit

Permalink
Fix behavior for li.large.message.segment.wrapping (#158)
Browse files Browse the repository at this point in the history
The li.large.message.segment.wrapping producer property is supposed to
force the producer to always wrap messages in the large message
envelope, even if the message is not larger than
max.message.segment.bytes in size.

This patch addresses two issues with this:

1. This option is enabled by default, which does not seem like an ideal
   default value for this option. Since it was effectively disabled due
   to a bug anyway, it's safe to make the new default value "false."
2. The li.large.message.segment.wrapping property is honored and a
   single-segment large message is created, but the default serializer
   ends up just stripping away the large message segment envelope
   because the segment count is 1. This extra branch is removed so the
   feature works correctly.

Added an integration test that shows the
li.large.message.segment.wrapping functioning correctly when enabled.
  • Loading branch information
ambroff authored Nov 21, 2019
1 parent 18c0850 commit 153bb1e
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
import java.util.Properties;
import java.util.Set;

import static com.linkedin.kafka.clients.producer.LiKafkaProducerConfig.LARGE_MESSAGE_ENABLED_CONFIG;
import static com.linkedin.kafka.clients.producer.LiKafkaProducerConfig.LARGE_MESSAGE_SEGMENT_WRAPPING_REQUIRED_CONFIG;
import static com.linkedin.kafka.clients.producer.LiKafkaProducerConfig.MAX_MESSAGE_SEGMENT_BYTES_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -71,6 +75,37 @@ public void tearDown() {
super.tearDown();
}

@Test
public void testAlwaysUseLargeMessageEnvelope() throws Exception {
//create the test topic
try (AdminClient adminClient = createRawAdminClient(null)) {
adminClient.createTopics(Collections.singletonList(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1))).all().get(1, TimeUnit.MINUTES);
}

{
long startTime = System.currentTimeMillis();
Properties props = new Properties();
props.setProperty(LARGE_MESSAGE_ENABLED_CONFIG, "true");
props.setProperty(LARGE_MESSAGE_SEGMENT_WRAPPING_REQUIRED_CONFIG, "true");
props.setProperty(MAX_MESSAGE_SEGMENT_BYTES_CONFIG, "200");
props.setProperty(CLIENT_ID_CONFIG, "testProducer");
LiKafkaProducer<String, String> largeMessageProducer = createProducer(props);

// This is how large we expect the final message to be, including the version byte, checksum, segment info and
// the user payload itself.
final int expectedProducedMessageSize =
+ Byte.BYTES
+ Integer.BYTES
+ LargeMessageSegment.SEGMENT_INFO_OVERHEAD
+ "hello".length();

largeMessageProducer.send(new ProducerRecord<>(TOPIC, "hello"), (recordMetadata, e) -> {
assertEquals(recordMetadata.serializedValueSize(), expectedProducedMessageSize);
});
largeMessageProducer.close();
}
}

@Test
public void testLargeMessage() throws Exception {
//create the test topic
Expand Down Expand Up @@ -150,7 +185,7 @@ public void testLargeMessage() throws Exception {

private static Properties buildConsumerProps() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testLargeMessageConsumer");
props.setProperty(CLIENT_ID_CONFIG, "testLargeMessageConsumer");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testLargeMessageConsumer");
props.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,17 @@ public void configure(Map<String, ?> configs, boolean isKey) {

@Override
public byte[] serialize(String s, LargeMessageSegment segment) {
if (segment.numberOfSegments > 1) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1 + LargeMessageSegment.SEGMENT_INFO_OVERHEAD +
segment.payload.limit() + CHECKSUM_LENGTH);
byteBuffer.put(LargeMessageSegment.CURRENT_VERSION);
byteBuffer.putInt((int) (segment.messageId.getMostSignificantBits() + segment.messageId.getLeastSignificantBits()));
byteBuffer.putLong(segment.messageId.getMostSignificantBits());
byteBuffer.putLong(segment.messageId.getLeastSignificantBits());
byteBuffer.putInt(segment.sequenceNumber);
byteBuffer.putInt(segment.numberOfSegments);
byteBuffer.putInt(segment.messageSizeInBytes);
byteBuffer.put(segment.payload);
return byteBuffer.array();
} else {
return segment.payloadArray();
}
ByteBuffer byteBuffer = ByteBuffer.allocate(1 + LargeMessageSegment.SEGMENT_INFO_OVERHEAD +
segment.payload.limit() + CHECKSUM_LENGTH);
byteBuffer.put(LargeMessageSegment.CURRENT_VERSION);
byteBuffer.putInt((int) (segment.messageId.getMostSignificantBits() + segment.messageId.getLeastSignificantBits()));
byteBuffer.putLong(segment.messageId.getMostSignificantBits());
byteBuffer.putLong(segment.messageId.getLeastSignificantBits());
byteBuffer.putInt(segment.sequenceNumber);
byteBuffer.putInt(segment.numberOfSegments);
byteBuffer.putInt(segment.messageSizeInBytes);
byteBuffer.put(segment.payload);
return byteBuffer.array();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public class LiKafkaProducerConfig extends AbstractConfig {
.define(CLUSTER_GROUP_CONFIG, Type.STRING, "", Importance.MEDIUM, CLUSTER_GROUP_DOC)
.define(CLUSTER_ENVIRONMENT_CONFIG, Type.STRING, "", Importance.MEDIUM, CLUSTER_ENVIRONMENT_DOC)
.define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), Importance.MEDIUM, MAX_REQUEST_SIZE_DOC)
.define(LARGE_MESSAGE_SEGMENT_WRAPPING_REQUIRED_CONFIG, Type.BOOLEAN, "true", Importance.MEDIUM,
.define(LARGE_MESSAGE_SEGMENT_WRAPPING_REQUIRED_CONFIG, Type.BOOLEAN, "false", Importance.MEDIUM,
LARGE_MESSAGE_SEGMENT_WRAPPING_REQUIRED_DOC)
.define(METADATA_SERVICE_REQUEST_TIMEOUT_MS_CONFIG, Type.INT, Integer.MAX_VALUE, Importance.MEDIUM,
METADATA_SERVICE_REQUEST_TIMEOUT_MS_DOC);
Expand Down

0 comments on commit 153bb1e

Please sign in to comment.