diff --git a/README.md b/README.md index 0517dfc..201df05 100644 --- a/README.md +++ b/README.md @@ -156,7 +156,7 @@ In this case the IP address is one of the nodes running the distributed mode wor { "class": "com.solace.connector.kafka.connect.source.SolaceSourceConnector", "type": "source", - "version": "3.0.1" + "version": "3.1.0" }, ``` @@ -374,6 +374,14 @@ For reference, this project includes two examples which you can use as starting * [SolSampleSimpleMessageProcessor](/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolSampleSimpleMessageProcessor.java) * [SolaceSampleKeyedMessageProcessor](/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolaceSampleKeyedMessageProcessor.java) +Above two processors by default won't map/forward the Solace message user properties and Solace standard properties. If you want to map/forward them as Kafka record headers set below two properties to `true` in connector configuration. Refer sample [here](/etc/solace_source_properties.json) and [Parameters section](#parameters) section for details. + +``` +sol.message_processor.map_user_properties=true +sol.message_processor.map_solace_standard_properties=true +``` + + Once you've built the jar file for your custom message processor project, place it into the same directory as this connector, and update the connector's `sol.message_processor_class` config to point to the class of your new message processor. More information on Kafka source connector development can be found here: diff --git a/etc/solace_source.properties b/etc/solace_source.properties index 0290118..6e43efe 100644 --- a/etc/solace_source.properties +++ b/etc/solace_source.properties @@ -35,6 +35,13 @@ sol.message_processor_class=com.solace.connector.kafka.connect.source.msgprocess # If enabled, messages that throw message processor errors will be discarded. #sol.message_processor.error.ignore=false +# If enabled, maps/forwards the user properties Map from Solace message to Kafka record headers +#sol.message_processor.map_user_properties=false + +# If enabled, maps/forwards the Solace message standard properties (e.g. correlationId, applicationMessageId, redelivered, dmqEligible, COS etc) to Kafka record headers +# The Solace standard properties names will be prefixed with "solace_" (e.g. correlationId as solace_correlationId) to Kafka record headers +#sol.message_processor.map_solace_standard_properties=false + # When using SolaceSampleKeyedMessageProcessor, defines which part of a # PubSub+ message shall be converted to a Kafka record key # Allowable values include: NONE, DESTINATION, CORRELATION_ID, CORRELATION_ID_AS_BYTES diff --git a/etc/solace_source_properties.json b/etc/solace_source_properties.json index 3bc7df1..aeb26ef 100644 --- a/etc/solace_source_properties.json +++ b/etc/solace_source_properties.json @@ -11,6 +11,8 @@ "sol.vpn_name": "default", "sol.topics": "sourcetest", "sol.message_processor_class": "com.solace.connector.kafka.connect.source.msgprocessors.SolSampleSimpleMessageProcessor", + "sol.message_processor.map_user_properties": "false", + "sol.message_processor.map_solace_standard_properties": "false", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "key.converter": "org.apache.kafka.connect.storage.StringConverter" } } \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 67e1778..4bf2221 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ group=com.solace.connector.kafka.connect -version=3.0.1 \ No newline at end of file +version=3.1.0 \ No newline at end of file diff --git a/src/integrationTest/java/com/solace/connector/kafka/connect/source/it/SourceConnectorIT.java b/src/integrationTest/java/com/solace/connector/kafka/connect/source/it/SourceConnectorIT.java index 190e2d7..a1a9b58 100644 --- a/src/integrationTest/java/com/solace/connector/kafka/connect/source/it/SourceConnectorIT.java +++ b/src/integrationTest/java/com/solace/connector/kafka/connect/source/it/SourceConnectorIT.java @@ -1,5 +1,14 @@ package com.solace.connector.kafka.connect.source.it; +import static com.solace.connector.kafka.connect.source.SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_MAP_SOLACE_STANDARD_PROPERTIES; +import static com.solace.connector.kafka.connect.source.SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_MAP_USER_PROPERTIES; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonObject; @@ -9,18 +18,29 @@ import com.solace.connector.kafka.connect.source.it.util.extensions.KafkaArgumentsProvider.KafkaContext; import com.solace.connector.kafka.connect.source.it.util.extensions.pubsubplus.pubsubplus.NetworkPubSubPlusContainerProvider; import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension; +import com.solacesystems.common.util.ByteArray; import com.solacesystems.jcsmp.BytesMessage; import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.JCSMPFactory; import com.solacesystems.jcsmp.JCSMPProperties; import com.solacesystems.jcsmp.JCSMPSession; import com.solacesystems.jcsmp.Message; import com.solacesystems.jcsmp.Queue; +import com.solacesystems.jcsmp.SDTException; +import com.solacesystems.jcsmp.SDTMap; import com.solacesystems.jcsmp.TextMessage; import com.solacesystems.jcsmp.Topic; import com.solacesystems.jcsmp.impl.AbstractDestination; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -33,18 +53,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicReference; - -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; - @ExtendWith(PubSubPlusExtension.class) @ExtendWith(KafkaArgumentsProvider.AutoDeleteSolaceConnectorDeploymentAfterEach.class) public class SourceConnectorIT implements TestConstants { @@ -109,6 +117,26 @@ void messageToKafkaTest(Message msg, AbstractDestination destination, String exp assert (Arrays.equals(b, (byte[]) expectedKey)); } } + + if ("true".equalsIgnoreCase(connectorProps.getProperty(SOL_MESSAGE_PROCESSOR_MAP_USER_PROPERTIES)) + || "true".equalsIgnoreCase(connectorProps.getProperty(SOL_MESSAGE_PROCESSOR_MAP_SOLACE_STANDARD_PROPERTIES))) { + + //verify user properties + final SDTMap solUserProperties = msg.getProperties(); + final RecordHeaders recordHeaders = new RecordHeaders(record.headers().toArray()); + if (solUserProperties != null && solUserProperties.keySet().size() > 0) { + LOG.info("Headers: " + recordHeaders); + solUserProperties.keySet().forEach(key -> assertNotNull(recordHeaders.remove(key))); //Removes header + } + + //Any remaining headers must be solace standard headers + if (recordHeaders.toArray().length > 0) { + LOG.info("Headers: " + recordHeaders); + recordHeaders.iterator().forEachRemaining(header -> assertTrue(header.key().startsWith("solace_"))); + } + } else { + assertThat(record.headers().toArray().length, equalTo(0)); + } } catch (JCSMPException e1) { e1.printStackTrace(); } @@ -130,6 +158,8 @@ void setUp() { solaceProducer.resetQueue(SOL_QUEUE); connectorProps.setProperty("sol.message_processor_class", "com.solace.connector.kafka.connect.source.msgprocessors.SolSampleSimpleMessageProcessor"); + connectorProps.setProperty("sol.message_processor.map_user_properties", "false"); + connectorProps.setProperty("sol.message_processor.map_solace_standard_properties", "false"); connectorProps.setProperty("sol.topics", "TestTopic1/SubTopic"); connectorProps.setProperty("sol.username", "test"); connectorProps.setProperty("sol.password", "test"); @@ -138,9 +168,10 @@ void setUp() { @DisplayName("TextMessage-Topic-SolSampleSimpleMessageProcessor") @ParameterizedTest @KafkaArgumentSource - void kafkaConsumerTextMessageToTopicTest(KafkaContext kafkaContext) { + void kafkaConsumerTextMessageToTopicTest(KafkaContext kafkaContext) throws SDTException { kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps); TextMessage msg = solaceProducer.createTextMessage("1-Hello TextMessageToTopicTest world!"); + msg.setProperties(getTestUserProperties()); messageToKafkaTest(msg, solaceProducer.defineTopic("TestTopic1/SubTopic"), // expected value & key: "1-Hello TextMessageToTopicTest world!", null, kafkaContext); @@ -149,10 +180,11 @@ void kafkaConsumerTextMessageToTopicTest(KafkaContext kafkaContext) { @DisplayName("ByteMessage-Topic-SolSampleSimpleMessageProcessor") @ParameterizedTest @KafkaArgumentSource - void kafkaConsumerByteMessageToTopicTest(KafkaContext kafkaContext) { + void kafkaConsumerByteMessageToTopicTest(KafkaContext kafkaContext) throws SDTException { kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps); BytesMessage msg = solaceProducer.createBytesMessage(new byte[] { '2', '-', 'H', 'e', 'l', 'l', 'o', ' ', 'T', 'o', 'p', 'i', 'c', ' ', 'w', 'o', 'r', 'l', 'd', '!' }); + msg.setProperties(getTestUserProperties()); messageToKafkaTest(msg, solaceProducer.defineTopic("TestTopic1/SubTopic"), // expected value & key: "2-Hello Topic world!", null, kafkaContext); @@ -161,11 +193,13 @@ void kafkaConsumerByteMessageToTopicTest(KafkaContext kafkaContext) { @DisplayName("ByteMessage-AttachmentPayload-Topic-SolSampleSimpleMessageProcessor") @ParameterizedTest @KafkaArgumentSource - void kafkaConsumerByteMessageWithAttachmentPayloadToTopicTest(KafkaContext kafkaContext) { + void kafkaConsumerByteMessageWithAttachmentPayloadToTopicTest(KafkaContext kafkaContext) + throws SDTException { kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps); BytesMessage msg = solaceProducer.createBytesMessage(null); msg.writeAttachment(new byte[] { '3', '-', 'H', 'e', 'l', 'l', 'o', ' ', 'a', 't', 't', 'a', 'c', 'h', 'e', 'd', ' ', 'w', 'o', 'r', 'l', 'd', '!' }); + msg.setProperties(getTestUserProperties()); messageToKafkaTest(msg, solaceProducer.defineTopic("TestTopic1/SubTopic"), // expected value & key: "3-Hello attached world!", null, kafkaContext); @@ -174,9 +208,10 @@ void kafkaConsumerByteMessageWithAttachmentPayloadToTopicTest(KafkaContext kafka @DisplayName("TextMessage-Queue-SolSampleSimpleMessageProcessor") @ParameterizedTest @KafkaArgumentSource - void kafkaConsumerTextmessageToKafkaTest(KafkaContext kafkaContext) { + void kafkaConsumerTextmessageToKafkaTest(KafkaContext kafkaContext) throws SDTException { kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps); TextMessage msg = solaceProducer.createTextMessage("4-Hello TextmessageToKafkaTest world!"); + msg.setProperties(getTestUserProperties()); messageToKafkaTest(msg, solaceProducer.defineQueue(SOL_QUEUE), // expected value & key: "4-Hello TextmessageToKafkaTest world!", null, kafkaContext); @@ -185,10 +220,11 @@ void kafkaConsumerTextmessageToKafkaTest(KafkaContext kafkaContext) { @DisplayName("BytesMessage-Queue-SolSampleSimpleMessageProcessor") @ParameterizedTest @KafkaArgumentSource - void kafkaConsumerBytesmessageToKafkaTest(KafkaContext kafkaContext) { + void kafkaConsumerBytesmessageToKafkaTest(KafkaContext kafkaContext) throws SDTException { kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps); BytesMessage msg = solaceProducer.createBytesMessage(new byte[] { '5', '-', 'H', 'e', 'l', 'l', 'o', ' ', 'Q', 'u', 'e', 'u', 'e', ' ', 'w', 'o', 'r', 'l', 'd', '!' }); + msg.setProperties(getTestUserProperties()); messageToKafkaTest(msg, solaceProducer.defineQueue(SOL_QUEUE), // expected value & key: "5-Hello Queue world!", null, kafkaContext); @@ -197,11 +233,13 @@ void kafkaConsumerBytesmessageToKafkaTest(KafkaContext kafkaContext) { @DisplayName("ByteMessage-AttachmentPayload-Queue-SolSampleSimpleMessageProcessor") @ParameterizedTest @KafkaArgumentSource - void kafkaConsumerByteMessageWithAttachmentPayloadToQueueTest(KafkaContext kafkaContext) { + void kafkaConsumerByteMessageWithAttachmentPayloadToQueueTest(KafkaContext kafkaContext) + throws SDTException { kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps); BytesMessage msg = solaceProducer.createBytesMessage(null); msg.writeAttachment(new byte[] { '6', '-', 'H', 'e', 'l', 'l', 'o', ' ', 'a', 't', 't', 'a', 'c', 'h', 'e', 'd', ' ', 'w', 'o', 'r', 'l', 'd', '!' }); + msg.setProperties(getTestUserProperties()); messageToKafkaTest(msg, solaceProducer.defineQueue(SOL_QUEUE), // expected value & key: "6-Hello attached world!", null, kafkaContext); @@ -221,6 +259,8 @@ void setUp() { solaceProducer.resetQueue(SOL_QUEUE); connectorProps.setProperty("sol.message_processor_class", "com.solace.connector.kafka.connect.source.msgprocessors.SolaceSampleKeyedMessageProcessor"); + connectorProps.setProperty("sol.message_processor.map_user_properties", "false"); + connectorProps.setProperty("sol.message_processor.map_solace_standard_properties", "false"); connectorProps.setProperty("sol.kafka_message_key", "NONE"); connectorProps.setProperty("sol.topics", "TestTopic1/SubTopic,TestTopic2/*,TestTopic3/>"); } @@ -228,9 +268,10 @@ void setUp() { @DisplayName("TextMessage-Topic-SolSampleKeyedMessageProcessor") @ParameterizedTest @KafkaArgumentSource - void kafkaConsumerTextMessageToTopicTest(KafkaContext kafkaContext) { + void kafkaConsumerTextMessageToTopicTest(KafkaContext kafkaContext) throws SDTException { kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps); TextMessage msg = solaceProducer.createTextMessage("Hello TextMessageToTopicTest1 world!"); + msg.setProperties(getTestUserProperties()); messageToKafkaTest(msg, solaceProducer.defineTopic("TestTopic1/SubTopic"), // expected value & key: "Hello TextMessageToTopicTest1 world!", null, kafkaContext); @@ -239,10 +280,11 @@ void kafkaConsumerTextMessageToTopicTest(KafkaContext kafkaContext) { @DisplayName("ByteMessage-Topic-SolSampleKeyedMessageProcessor") @ParameterizedTest @KafkaArgumentSource - void kafkaConsumerByteMessageToTopicTest(KafkaContext kafkaContext) { + void kafkaConsumerByteMessageToTopicTest(KafkaContext kafkaContext) throws SDTException { kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps); BytesMessage msg = solaceProducer.createBytesMessage( new byte[] { 'H', 'e', 'l', 'l', 'o', ' ', 'T', 'o', 'p', 'i', 'c', ' ', 'w', 'o', 'r', 'l', 'd', '!' }); + msg.setProperties(getTestUserProperties()); messageToKafkaTest(msg, solaceProducer.defineTopic("TestTopic1/SubTopic"), // expected value & key: "Hello Topic world!", null, kafkaContext); @@ -251,11 +293,13 @@ void kafkaConsumerByteMessageToTopicTest(KafkaContext kafkaContext) { @DisplayName("ByteMessage-AttachmentPayload-Topic-SolSampleKeyedMessageProcessor") @ParameterizedTest @KafkaArgumentSource - void kafkaConsumerByteMessageWithAttachmentPayloadToTopicTest(KafkaContext kafkaContext) { + void kafkaConsumerByteMessageWithAttachmentPayloadToTopicTest(KafkaContext kafkaContext) + throws SDTException { kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps); BytesMessage msg = solaceProducer.createBytesMessage(null); msg.writeAttachment(new byte[] { 'H', 'e', 'l', 'l', 'o', ' ', 'a', 't', 't', 'a', 'c', 'h', 'e', 'd', ' ', 'w', 'o', 'r', 'l', 'd', '!' }); + msg.setProperties(getTestUserProperties()); messageToKafkaTest(msg, solaceProducer.defineTopic("TestTopic1/SubTopic"), // expected value & key: "Hello attached world!", null, kafkaContext); @@ -264,9 +308,10 @@ void kafkaConsumerByteMessageWithAttachmentPayloadToTopicTest(KafkaContext kafka @DisplayName("TextMessage-Queue-SolSampleKeyedMessageProcessor") @ParameterizedTest @KafkaArgumentSource - void kafkaConsumerTextmessageToKafkaTest(KafkaContext kafkaContext) { + void kafkaConsumerTextmessageToKafkaTest(KafkaContext kafkaContext) throws SDTException { kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps); TextMessage msg = solaceProducer.createTextMessage("Hello TextmessageToKafkaTest world!"); + msg.setProperties(getTestUserProperties()); messageToKafkaTest(msg, solaceProducer.defineQueue(SOL_QUEUE), // expected value & key: "Hello TextmessageToKafkaTest world!", null, kafkaContext); @@ -275,10 +320,11 @@ void kafkaConsumerTextmessageToKafkaTest(KafkaContext kafkaContext) { @DisplayName("BytesMessage-Queue-SolSampleKeyedMessageProcessor") @ParameterizedTest @KafkaArgumentSource - void kafkaConsumerBytesmessageToKafkaTest(KafkaContext kafkaContext) { + void kafkaConsumerBytesmessageToKafkaTest(KafkaContext kafkaContext) throws SDTException { kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps); BytesMessage msg = solaceProducer.createBytesMessage( new byte[] { 'H', 'e', 'l', 'l', 'o', ' ', 'Q', 'u', 'e', 'u', 'e', ' ', 'w', 'o', 'r', 'l', 'd', '!' }); + msg.setProperties(getTestUserProperties()); messageToKafkaTest(msg, solaceProducer.defineQueue(SOL_QUEUE), // expected value & key: "Hello Queue world!", null, kafkaContext); @@ -287,11 +333,13 @@ void kafkaConsumerBytesmessageToKafkaTest(KafkaContext kafkaContext) { @DisplayName("ByteMessage-AttachmentPayload-Queue-SolSampleKeyedMessageProcessor") @ParameterizedTest @KafkaArgumentSource - void kafkaConsumerByteMessageWithAttachmentPayloadToQueueTest(KafkaContext kafkaContext) { + void kafkaConsumerByteMessageWithAttachmentPayloadToQueueTest(KafkaContext kafkaContext) + throws SDTException { kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps); BytesMessage msg = solaceProducer.createBytesMessage(null); msg.writeAttachment(new byte[] { 'H', 'e', 'l', 'l', 'o', ' ', 'a', 't', 't', 'a', 'c', 'h', 'e', 'd', ' ', 'w', 'o', 'r', 'l', 'd', '!' }); + msg.setProperties(getTestUserProperties()); messageToKafkaTest(msg, solaceProducer.defineQueue(SOL_QUEUE), // expected value & key: "Hello attached world!", null, kafkaContext); @@ -311,6 +359,8 @@ void setUp() { solaceProducer.resetQueue(SOL_QUEUE); connectorProps.setProperty("sol.message_processor_class", "com.solace.connector.kafka.connect.source.msgprocessors.SolaceSampleKeyedMessageProcessor"); + connectorProps.setProperty("sol.message_processor.map_user_properties", "false"); + connectorProps.setProperty("sol.message_processor.map_solace_standard_properties", "false"); connectorProps.setProperty("sol.kafka_message_key", "DESTINATION"); connectorProps.setProperty("sol.topics", "TestTopic1/SubTopic,TestTopic2/*,TestTopic3/>"); } @@ -396,6 +446,8 @@ void setUp() { solaceProducer.resetQueue(SOL_QUEUE); connectorProps.setProperty("sol.message_processor_class", "com.solace.connector.kafka.connect.source.msgprocessors.SolaceSampleKeyedMessageProcessor"); + connectorProps.setProperty("sol.message_processor.map_user_properties", "false"); + connectorProps.setProperty("sol.message_processor.map_solace_standard_properties", "false"); connectorProps.setProperty("sol.kafka_message_key", "CORRELATION_ID"); connectorProps.setProperty("sol.topics", "TestTopic1/SubTopic,TestTopic2/*,TestTopic3/>"); } @@ -463,6 +515,8 @@ void setUp() { solaceProducer.resetQueue(SOL_QUEUE); connectorProps.setProperty("sol.message_processor_class", "com.solace.connector.kafka.connect.source.msgprocessors.SolaceSampleKeyedMessageProcessor"); + connectorProps.setProperty("sol.message_processor.map_user_properties", "false"); + connectorProps.setProperty("sol.message_processor.map_solace_standard_properties", "false"); connectorProps.setProperty("sol.kafka_message_key", "CORRELATION_ID_AS_BYTES"); connectorProps.setProperty("sol.topics", "TestTopic1/SubTopic,TestTopic2/*,TestTopic3/>"); connectorProps.setProperty("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); @@ -532,6 +586,8 @@ void setUp() { solaceProducer.resetQueue(SOL_QUEUE); connectorProps.setProperty("sol.message_processor_class", "com.solace.connector.kafka.connect.source.msgprocessors.SolSampleSimpleMessageProcessor"); + connectorProps.setProperty("sol.message_processor.map_user_properties", "true"); + connectorProps.setProperty("sol.message_processor.map_solace_standard_properties", "true"); connectorProps.setProperty("sol.topics", "#share/group1/TestTopic1/SubTopic"); connectorProps.setProperty("tasks.max", "5"); } @@ -539,9 +595,10 @@ void setUp() { @DisplayName("TextMessage-Topic-SolSampleSimpleMessageProcessor") @ParameterizedTest @KafkaArgumentSource - void kafkaConsumerTextMessageToTopicTest(KafkaContext kafkaContext) { + void kafkaConsumerTextMessageToTopicTest(KafkaContext kafkaContext) throws SDTException { kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps); TextMessage msg = solaceProducer.createTextMessage("Hello TextMessageToTopicTest world!"); + msg.setProperties(getTestUserProperties()); messageToKafkaTest(msg, solaceProducer.defineTopic("TestTopic1/SubTopic"), // expected value & key: "Hello TextMessageToTopicTest world!", null, kafkaContext); @@ -550,10 +607,11 @@ void kafkaConsumerTextMessageToTopicTest(KafkaContext kafkaContext) { @DisplayName("ByteMessage-Topic-SolSampleSimpleMessageProcessor") @ParameterizedTest @KafkaArgumentSource - void kafkaConsumerByteMessageToTopicTest(KafkaContext kafkaContext) { + void kafkaConsumerByteMessageToTopicTest(KafkaContext kafkaContext) throws SDTException { kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps); BytesMessage msg = solaceProducer.createBytesMessage( new byte[] { 'H', 'e', 'l', 'l', 'o', ' ', 'T', 'o', 'p', 'i', 'c', ' ', 'w', 'o', 'r', 'l', 'd', '!' }); + msg.setProperties(getTestUserProperties()); messageToKafkaTest(msg, solaceProducer.defineTopic("TestTopic1/SubTopic"), // expected value & key: "Hello Topic world!", null, kafkaContext); @@ -590,6 +648,8 @@ void setUp() { void testFailPubSubConnection(KafkaContext kafkaContext) { connectorProps.setProperty("sol.message_processor_class", "com.solace.connector.kafka.connect.source.msgprocessors.SolSampleSimpleMessageProcessor"); + connectorProps.setProperty("sol.message_processor.map_user_properties", "true"); + connectorProps.setProperty("sol.message_processor.map_solace_standard_properties", "true"); connectorProps.setProperty("sol.vpn_name", RandomStringUtils.randomAlphanumeric(10)); kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps, true); AtomicReference connectorStatus = new AtomicReference<>(new JsonObject()); @@ -604,4 +664,28 @@ void testFailPubSubConnection(KafkaContext kafkaContext) { }, () -> "Timed out waiting for connector to fail: " + GSON.toJson(connectorStatus.get())); } } + + private SDTMap getTestUserProperties() throws SDTException { + final SDTMap solMsgUserProperties = JCSMPFactory.onlyInstance().createMap(); + solMsgUserProperties.putObject("null-value-user-property", null); + solMsgUserProperties.putBoolean("boolean-user-property", true); + solMsgUserProperties.putCharacter("char-user-property", 'C'); + solMsgUserProperties.putDouble("double-user-property", 123.4567); + solMsgUserProperties.putFloat("float-user-property", 000.1f); + solMsgUserProperties.putInteger("int-user-property", 1); + solMsgUserProperties.putLong("long-user-property", 10000L); + solMsgUserProperties.putShort("short-user-property", Short.valueOf("20")); + solMsgUserProperties.putString("string-user-property", "value1"); + solMsgUserProperties.putObject("bigInteger-user-property", new BigInteger("123456")); + solMsgUserProperties.putByte("byte-user-property", "A".getBytes()[0]); + solMsgUserProperties.putBytes("bytes-user-property", "Hello".getBytes()); + solMsgUserProperties.putByteArray("byteArray-user-property", + new ByteArray("Hello World".getBytes())); + solMsgUserProperties.putDestination("topic-user-property", + JCSMPFactory.onlyInstance().createTopic("testTopic")); + solMsgUserProperties.putDestination("queue-user-property", + JCSMPFactory.onlyInstance().createTopic("testQueue")); + + return solMsgUserProperties; + } } diff --git a/src/main/java/com/solace/connector/kafka/connect/source/SolMessageProcessorIF.java b/src/main/java/com/solace/connector/kafka/connect/source/SolMessageProcessorIF.java index 894ca6f..74a2e4a 100644 --- a/src/main/java/com/solace/connector/kafka/connect/source/SolMessageProcessorIF.java +++ b/src/main/java/com/solace/connector/kafka/connect/source/SolMessageProcessorIF.java @@ -19,12 +19,165 @@ package com.solace.connector.kafka.connect.source; +import com.solacesystems.common.util.ByteArray; import com.solacesystems.jcsmp.BytesXMLMessage; - +import com.solacesystems.jcsmp.Destination; +import com.solacesystems.jcsmp.SDTException; +import com.solacesystems.jcsmp.SDTMap; +import com.solacesystems.jcsmp.Topic; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Date; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public interface SolMessageProcessorIF { + + Logger log = LoggerFactory.getLogger(SolMessageProcessorIF.class); + SolMessageProcessorIF process(String skey, BytesXMLMessage message); SourceRecord[] getRecords(String kafkaTopic); + + default ConnectHeaders userPropertiesToKafkaHeaders(BytesXMLMessage message) { + final ConnectHeaders headers = new ConnectHeaders(); + final SDTMap userProperties = message.getProperties(); + + if (userProperties != null) { + for (String key : userProperties.keySet()) { + try { + Object value = userProperties.get(key); + if (value == null) { + headers.add(key, SchemaAndValue.NULL); + } else if (value instanceof String) { + headers.addString(key, (String) value); + } else if (value instanceof Boolean) { + headers.addBoolean(key, (Boolean) value); + } else if (value instanceof byte[]) { + headers.addBytes(key, (byte[]) value); + } else if (value instanceof ByteArray) { + headers.addBytes(key, ((ByteArray) value).asBytes()); + } else if (value instanceof Byte) { + headers.addByte(key, (byte) value); + } else if (value instanceof Integer) { + headers.addInt(key, (Integer) value); + } else if (value instanceof Short) { + headers.addShort(key, (Short) value); + } else if (value instanceof Long) { + headers.addLong(key, (Long) value); + } else if (value instanceof Double) { + headers.addDouble(key, (Double) value); + } else if (value instanceof Float) { + headers.addFloat(key, (Float) value); + } else if (value instanceof BigDecimal) { + headers.addDecimal(key, (BigDecimal) value); + } else if (value instanceof BigInteger) { + headers.addDecimal(key, new BigDecimal((BigInteger) value)); + } else if (value instanceof Date) { + headers.addDate(key, (Date) value); + } else if (value instanceof Character) { + headers.addString(key, ((Character) value).toString()); + } else if (value instanceof Destination) { + if (log.isTraceEnabled()) { + log.trace( + String.format("Extracting destination name from user property %s", key)); + } + String destinationName = ((Destination) value).getName(); + headers.addString(key, destinationName); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Ignoring user property with key [%s] and type [%s]", key, + value.getClass().getName())); + } + } + } catch (SDTException e) { + log.error(String.format("Ignoring user property with key [%s].", key), e); + } + } + } + + return headers; + } + + default ConnectHeaders solacePropertiesToKafkaHeaders(BytesXMLMessage msg) { + final ConnectHeaders headers = new ConnectHeaders(); + if (msg.getApplicationMessageId() != null) { + headers.addString(SolaceSourceConstants.SOL_SH_APPLICATION_MESSAGE_ID, + msg.getApplicationMessageId()); + } + + if (msg.getApplicationMessageType() != null) { + headers.addString(SolaceSourceConstants.SOL_SH_APPLICATION_MESSAGE_TYPE, + msg.getApplicationMessageType()); + } + + if (msg.getCorrelationId() != null) { + headers.addString(SolaceSourceConstants.SOL_SH_CORRELATION_ID, msg.getCorrelationId()); + } + + if (msg.getCos() != null) { + headers.addInt(SolaceSourceConstants.SOL_SH_COS, msg.getCos().value()); + } + + if (msg.getDeliveryMode() != null) { + headers.addString(SolaceSourceConstants.SOL_SH_DELIVERY_MODE, msg.getDeliveryMode().name()); + } + + if (msg.getDestination() != null) { + headers.addString(SolaceSourceConstants.SOL_SH_DESTINATION, msg.getDestination().getName()); + } + + if (msg.getReplyTo() != null) { + Destination replyToDestination = msg.getReplyTo(); + headers.addString(SolaceSourceConstants.SOL_SH_REPLY_TO_DESTINATION, + replyToDestination.getName()); + String destinationType = replyToDestination instanceof Topic ? "topic" : "queue"; + headers.addString(SolaceSourceConstants.SOL_SH_REPLY_TO_DESTINATION_TYPE, + destinationType); + } + + if (msg.getSenderId() != null) { + headers.addString(SolaceSourceConstants.SOL_SH_SENDER_ID, msg.getSenderId()); + } + + if (msg.getSenderTimestamp() != null) { + headers.addLong(SolaceSourceConstants.SOL_SH_SENDER_TIMESTAMP, msg.getSenderTimestamp()); + } + + if (msg.getTimeToLive() > 0) { + headers.addLong(SolaceSourceConstants.SOL_SH_TIME_TO_LIVE, msg.getTimeToLive()); + } + + if (msg.getExpiration() > 0) { + headers.addLong(SolaceSourceConstants.SOL_SH_EXPIRATION, msg.getExpiration()); + } + + if (msg.getHTTPContentEncoding() != null) { + headers.addString(SolaceSourceConstants.SOL_SH_HTTP_CONTENT_ENCODING, + msg.getHTTPContentEncoding()); + } + + if (msg.getHTTPContentType() != null) { + headers.addString(SolaceSourceConstants.SOL_SH_HTTP_CONTENT_TYPE, + msg.getHTTPContentType()); + } + + if (msg.getSequenceNumber() != null) { + headers.addLong(SolaceSourceConstants.SOL_SH_SEQUENCE_NUMBER, msg.getSequenceNumber()); + } + + headers.addInt(SolaceSourceConstants.SOL_SH_PRIORITY, msg.getPriority()); + headers.addLong(SolaceSourceConstants.SOL_SH_RECEIVE_TIMESTAMP, msg.getReceiveTimestamp()); + + headers.addBoolean(SolaceSourceConstants.SOL_SH_REDELIVERED, msg.getRedelivered()); + headers.addBoolean(SolaceSourceConstants.SOL_SH_DISCARD_INDICATION, msg.getDiscardIndication()); + headers.addBoolean(SolaceSourceConstants.SOL_SH_IS_DMQ_ELIGIBLE, msg.isDMQEligible()); + headers.addBoolean(SolaceSourceConstants.SOL_SH_IS_ELIDING_ELIGIBLE, msg.isElidingEligible()); + headers.addBoolean(SolaceSourceConstants.SOL_SH_IS_REPLY_MESSAGE, msg.isReplyMessage()); + + return headers; + } } diff --git a/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceConnectorConfig.java b/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceConnectorConfig.java index 2ca25e8..5c1c2fd 100644 --- a/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceConnectorConfig.java +++ b/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceConnectorConfig.java @@ -36,7 +36,7 @@ public class SolaceSourceConnectorConfig extends AbstractConfig { * Constructor to create Solace Configuration details for Source Connector. * @param properties the configuration properties */ - public SolaceSourceConnectorConfig(Map properties) { + public SolaceSourceConnectorConfig(Map properties) { super(config, properties); log.info("==================Initialize Connector properties"); @@ -254,7 +254,11 @@ public static ConfigDef solaceConfigDef() { .define(SolaceSourceConstants.SOL_KERBEROS_LOGIN_CONFIG, Type.STRING, "", Importance.LOW, "Location of the Kerberos Login Configuration File") .define(SolaceSourceConstants.SOL_KAFKA_MESSAGE_KEY, Type.STRING, "NONE", Importance.MEDIUM, - "This propert determines if a Kafka key record is created and the key to be used"); + "This property determines if a Kafka key record is created and the key to be used") + .define(SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_MAP_USER_PROPERTIES, Type.BOOLEAN, false, Importance.MEDIUM, + "This property determines if Solace message user properties will be mapped to Kafka record headers") + .define(SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_MAP_SOLACE_STANDARD_PROPERTIES, Type.BOOLEAN, false, Importance.MEDIUM, + "This property determines if Solace message standard properties will be mapped to Kafka record headers"); } diff --git a/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceConstants.java b/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceConstants.java index d22247b..55be45a 100644 --- a/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceConstants.java +++ b/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceConstants.java @@ -138,5 +138,33 @@ public class SolaceSourceConstants { public static final String SOL_KERBEROS_LOGIN_CONFIG = "sol.kerberos.login.conf"; public static final String SOL_KERBEROS_KRB5_CONFIG = "sol.kerberos.krb5.conf"; - + // Medium Importance Solace Message processor + public static final String SOL_MESSAGE_PROCESSOR_MAP_USER_PROPERTIES = "sol.message_processor.map_user_properties"; + public static final String SOL_MESSAGE_PROCESSOR_MAP_SOLACE_STANDARD_PROPERTIES = "sol.message_processor.map_solace_standard_properties"; + + + //All SOL_SH prefixed constants are Solace Message Standard Headers. + public static final String SOL_SH_APPLICATION_MESSAGE_ID = "solace_applicationMessageID"; + public static final String SOL_SH_APPLICATION_MESSAGE_TYPE = "solace_applicationMessageType"; + public static final String SOL_SH_CORRELATION_ID = "solace_correlationID"; + public static final String SOL_SH_COS = "solace_cos"; + public static final String SOL_SH_DELIVERY_MODE = "solace_deliveryMode"; + public static final String SOL_SH_DESTINATION = "solace_destination"; + public static final String SOL_SH_DISCARD_INDICATION = "solace_discardIndication"; + public static final String SOL_SH_EXPIRATION = "solace_expiration"; + public static final String SOL_SH_PRIORITY = "solace_priority"; + public static final String SOL_SH_RECEIVE_TIMESTAMP = "solace_receiveTimestamp"; + public static final String SOL_SH_REDELIVERED = "solace_redelivered"; + public static final String SOL_SH_REPLY_TO = "solace_replyTo"; + public static final String SOL_SH_REPLY_TO_DESTINATION_TYPE = "solace_replyToDestinationType"; + public static final String SOL_SH_REPLY_TO_DESTINATION = "solace_replyToDestination"; + public static final String SOL_SH_SENDER_ID = "solace_senderID"; + public static final String SOL_SH_SENDER_TIMESTAMP = "solace_senderTimestamp"; + public static final String SOL_SH_TIME_TO_LIVE = "solace_timeToLive"; + public static final String SOL_SH_IS_DMQ_ELIGIBLE = "solace_DMQEligible"; + public static final String SOL_SH_IS_ELIDING_ELIGIBLE = "solace_elidingEligible"; + public static final String SOL_SH_IS_REPLY_MESSAGE = "solace_replyMessage"; + public static final String SOL_SH_HTTP_CONTENT_ENCODING = "solace_httpContentEncoding"; + public static final String SOL_SH_HTTP_CONTENT_TYPE = "solace_httpContentType"; + public static final String SOL_SH_SEQUENCE_NUMBER = "solace_sequenceNumber"; } diff --git a/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolSampleSimpleMessageProcessor.java b/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolSampleSimpleMessageProcessor.java index 50dfe47..c65e7e2 100644 --- a/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolSampleSimpleMessageProcessor.java +++ b/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolSampleSimpleMessageProcessor.java @@ -19,60 +19,102 @@ package com.solace.connector.kafka.connect.source.msgprocessors; +import static com.solace.connector.kafka.connect.source.SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_MAP_SOLACE_STANDARD_PROPERTIES; +import static com.solace.connector.kafka.connect.source.SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_MAP_USER_PROPERTIES; import com.solace.connector.kafka.connect.source.SolMessageProcessorIF; import com.solacesystems.jcsmp.BytesXMLMessage; -//import com.solacesystems.jcsmp.DeliveryMode; import com.solacesystems.jcsmp.TextMessage; - -import java.nio.charset.Charset; - import java.nio.charset.StandardCharsets; - +import java.util.LinkedList; +import java.util.Map; +import org.apache.kafka.common.Configurable; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SolSampleSimpleMessageProcessor implements SolMessageProcessorIF { +public class SolSampleSimpleMessageProcessor implements SolMessageProcessorIF, Configurable { private static final Logger log = LoggerFactory.getLogger(SolSampleSimpleMessageProcessor.class); private Object smsg; private String skey; private Object sdestination; private byte[] messageOut; + private LinkedList
headers = new LinkedList<>(); + private Map configs; + private boolean mapUserProperties; + private boolean mapSolaceStandardProperties; + + @Override + public void configure(Map configs) { + this.configs = configs; + this.mapUserProperties = getBooleanConfigProperty(SOL_MESSAGE_PROCESSOR_MAP_USER_PROPERTIES); + this.mapSolaceStandardProperties = getBooleanConfigProperty( + SOL_MESSAGE_PROCESSOR_MAP_SOLACE_STANDARD_PROPERTIES); + } @Override public SolMessageProcessorIF process(String skey, BytesXMLMessage msg) { this.smsg = msg; + this.headers.clear(); + + if (log.isDebugEnabled()) { + log.debug("{} received.", msg.getClass().getName()); + } if (msg instanceof TextMessage) { - log.debug("Text Message received {}", ((TextMessage) msg).getText()); String smsg = ((TextMessage) msg).getText(); messageOut = smsg.getBytes(StandardCharsets.UTF_8); } else { - log.debug("Message payload: {}", new String(msg.getBytes(), Charset.defaultCharset())); if (msg.getBytes().length != 0) { // Binary XML pay load messageOut = msg.getBytes(); } else { // Binary attachment pay load messageOut = msg.getAttachmentByteBuffer().array(); } } - log.debug("Message Dump:{}", msg.dump()); this.sdestination = msg.getDestination().getName(); - log.debug("processing data for destination: {}; with message {}, with Kafka topic key of: {}", - (String) this.sdestination, msg, this.skey); + if (log.isDebugEnabled()) { + log.debug("processing data for destination: {}; with Kafka topic key of: {}", + this.sdestination, this.skey); + } this.skey = skey; this.smsg = messageOut; + + if (mapUserProperties) { + ConnectHeaders userProps = userPropertiesToKafkaHeaders(msg); + userProps.iterator().forEachRemaining(headers::add); + } + + if (mapSolaceStandardProperties) { + ConnectHeaders solaceProps = solacePropertiesToKafkaHeaders(msg); + solaceProps.iterator().forEachRemaining(headers::add); + } + return this; } @Override public SourceRecord[] getRecords(String kafkaTopic) { - - return new SourceRecord[] { - new SourceRecord(null, null, kafkaTopic, null, null, - null, Schema.BYTES_SCHEMA, smsg) }; + + return new SourceRecord[]{ + new SourceRecord(null, null, kafkaTopic, null, null, + null, Schema.BYTES_SCHEMA, smsg, (Long) null, headers)}; } + private boolean getBooleanConfigProperty(String name) { + if (this.configs != null && this.configs.containsKey(name)) { + final Object value = this.configs.get(name); + if (value instanceof String) { + return Boolean.parseBoolean((String) value); + } else if (value instanceof Boolean) { + return (boolean) value; + } else { + log.error("The value of property {} should be of type boolean or string.", name); + } + } + return false; + } } diff --git a/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolaceSampleKeyedMessageProcessor.java b/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolaceSampleKeyedMessageProcessor.java index 7c72abc..8440530 100644 --- a/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolaceSampleKeyedMessageProcessor.java +++ b/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolaceSampleKeyedMessageProcessor.java @@ -19,29 +19,34 @@ package com.solace.connector.kafka.connect.source.msgprocessors; +import static com.solace.connector.kafka.connect.source.SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_MAP_SOLACE_STANDARD_PROPERTIES; +import static com.solace.connector.kafka.connect.source.SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_MAP_USER_PROPERTIES; import com.solace.connector.kafka.connect.source.SolMessageProcessorIF; import com.solacesystems.jcsmp.BytesXMLMessage; import com.solacesystems.jcsmp.TextMessage; - -import java.nio.charset.Charset; - import java.nio.charset.StandardCharsets; - +import java.util.LinkedList; +import java.util.Map; +import org.apache.kafka.common.Configurable; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SolaceSampleKeyedMessageProcessor implements SolMessageProcessorIF { +public class SolaceSampleKeyedMessageProcessor implements SolMessageProcessorIF, Configurable { - private static final Logger log + private static final Logger log = LoggerFactory.getLogger(SolaceSampleKeyedMessageProcessor.class); private Object smsg; + private Object sdestination; private byte[] messageOut; private String skey; private BytesXMLMessage msg; private SchemaAndValue key; + private LinkedList
headers = new LinkedList<>(); public enum KeyHeader { NONE, DESTINATION, CORRELATION_ID, CORRELATION_ID_AS_BYTES @@ -49,27 +54,43 @@ public enum KeyHeader { protected KeyHeader keyheader = KeyHeader.NONE; + private Map configs; + private boolean mapUserProperties; + private boolean mapSolaceStandardProperties; + + @Override + public void configure(Map configs) { + this.configs = configs; + this.mapUserProperties = getBooleanConfigProperty(SOL_MESSAGE_PROCESSOR_MAP_USER_PROPERTIES); + this.mapSolaceStandardProperties = getBooleanConfigProperty( + SOL_MESSAGE_PROCESSOR_MAP_SOLACE_STANDARD_PROPERTIES); + } + @Override public SolMessageProcessorIF process(String skey, BytesXMLMessage msg) { this.msg = msg; + this.headers.clear(); this.skey = skey.toUpperCase(); + if (log.isDebugEnabled()) { + log.debug("{} received.", msg.getClass().getName()); + } if (msg instanceof TextMessage) { - log.debug("Text Mesasge received {}", ((TextMessage) msg).getText()); String smsg = ((TextMessage) msg).getText(); messageOut = smsg.getBytes(StandardCharsets.UTF_8); } else { - log.debug("Message payload: {}", new String(msg.getBytes(), Charset.defaultCharset())); if (msg.getBytes().length != 0) { // Binary XML pay load messageOut = msg.getBytes(); } else { // Binary attachment pay load messageOut = msg.getAttachmentByteBuffer().array(); } - } - log.debug("Message Dump:{}", msg.dump()); - log.debug("processing data for Kafka topic Key: {}; with message {}", skey, msg); + this.sdestination = msg.getDestination().getName(); + if (log.isDebugEnabled()) { + log.debug("processing data for destination: {}; with Kafka topic key of: {}", + this.sdestination, this.skey); + } this.smsg = messageOut; @@ -85,16 +106,26 @@ public SolMessageProcessorIF process(String skey, BytesXMLMessage msg) { this.key = this.getKey(); + if (mapUserProperties) { + ConnectHeaders userProps = userPropertiesToKafkaHeaders(msg); + userProps.iterator().forEachRemaining(headers::add); + } + + if (mapSolaceStandardProperties) { + ConnectHeaders solaceProps = solacePropertiesToKafkaHeaders(msg); + solaceProps.iterator().forEachRemaining(headers::add); + } return this; } @Override public SourceRecord[] getRecords(String kafkaTopic) { - - log.debug("=======Key Schema: {}, Key Value: {}", this.key.schema(), this.key.value()); - return new SourceRecord[] { new SourceRecord(null, null, kafkaTopic, + if (log.isDebugEnabled()) { + log.debug("=======Key Schema: {}, Key Value: {}", this.key.schema(), this.key.value()); + } + return new SourceRecord[]{new SourceRecord(null, null, kafkaTopic, null, this.key.schema(), this.key.value(), - Schema.BYTES_SCHEMA, smsg) }; + Schema.BYTES_SCHEMA, smsg, null, headers)}; } SchemaAndValue getKey() { @@ -129,4 +160,17 @@ SchemaAndValue getKey() { return new SchemaAndValue(keySchema, key); } + private boolean getBooleanConfigProperty(String name) { + if (this.configs != null && this.configs.containsKey(name)) { + final Object value = this.configs.get(name); + if (value instanceof String) { + return Boolean.parseBoolean((String) value); + } else if (value instanceof Boolean) { + return (boolean) value; + } else { + log.error("The value of property {} should be of type boolean or string.", name); + } + } + return false; + } } diff --git a/src/test/java/com/solace/connector/kafka/connect/source/SolMessageProcessorIFTest.java b/src/test/java/com/solace/connector/kafka/connect/source/SolMessageProcessorIFTest.java new file mode 100644 index 0000000..318855b --- /dev/null +++ b/src/test/java/com/solace/connector/kafka/connect/source/SolMessageProcessorIFTest.java @@ -0,0 +1,139 @@ +package com.solace.connector.kafka.connect.source; + +import static com.solace.connector.kafka.connect.source.SolaceSourceConstants.SOL_SH_APPLICATION_MESSAGE_ID; +import static com.solace.connector.kafka.connect.source.SolaceSourceConstants.SOL_SH_APPLICATION_MESSAGE_TYPE; +import static com.solace.connector.kafka.connect.source.SolaceSourceConstants.SOL_SH_CORRELATION_ID; +import static com.solace.connector.kafka.connect.source.SolaceSourceConstants.SOL_SH_COS; +import static com.solace.connector.kafka.connect.source.SolaceSourceConstants.SOL_SH_DELIVERY_MODE; +import static com.solace.connector.kafka.connect.source.SolaceSourceConstants.SOL_SH_DESTINATION; +import static com.solace.connector.kafka.connect.source.SolaceSourceConstants.SOL_SH_REPLY_TO_DESTINATION; +import static com.solace.connector.kafka.connect.source.SolaceSourceConstants.SOL_SH_REPLY_TO_DESTINATION_TYPE; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import com.solace.connector.kafka.connect.source.msgprocessors.SolSampleSimpleMessageProcessor; +import com.solacesystems.common.util.ByteArray; +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.SDTException; +import com.solacesystems.jcsmp.SDTMap; +import com.solacesystems.jcsmp.TextMessage; +import com.solacesystems.jcsmp.User_Cos; +import com.solacesystems.jcsmp.impl.RawSMFMessageImpl; +import java.math.BigInteger; +import java.util.UUID; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class SolMessageProcessorIFTest { + + private SolMessageProcessorIF messageProcessor; + + @BeforeEach + void setUp() { + messageProcessor = new SolSampleSimpleMessageProcessor(); + } + + @Test + void testUserPropertiesMappingGivenNullUserPropertyMap() { + final BytesXMLMessage message = mock(TextMessage.class); + when(message.getProperties()).thenReturn(null); + + ConnectHeaders kafkaHeaders = messageProcessor.userPropertiesToKafkaHeaders(message); + assertThat("getProperties() is null", kafkaHeaders.isEmpty()); + } + + @Test + void testUserPropertiesMappingGiveEmptyUserPropertyMap() { + final SDTMap solMsgUserProperties = JCSMPFactory.onlyInstance().createMap(); + final BytesXMLMessage message = mock(TextMessage.class); + when(message.getProperties()).thenReturn(solMsgUserProperties); + + ConnectHeaders kafkaHeaders = messageProcessor.userPropertiesToKafkaHeaders(message); + assertThat("solMsgUserProperties is empty", kafkaHeaders.isEmpty()); + } + + @Test + void testUserPropertiesMappingForGivenUserPropertyMap() throws SDTException { + final SDTMap solMsgUserProperties = JCSMPFactory.onlyInstance().createMap(); + solMsgUserProperties.putObject("null-value-user-property", null); + solMsgUserProperties.putBoolean("boolean-user-property", true); + solMsgUserProperties.putCharacter("char-user-property", 'C'); + solMsgUserProperties.putDouble("double-user-property", 123.4567); + solMsgUserProperties.putFloat("float-user-property", 000.1f); + solMsgUserProperties.putInteger("int-user-property", 1); + solMsgUserProperties.putLong("long-user-property", 10000L); + solMsgUserProperties.putShort("short-user-property", Short.valueOf("20")); + solMsgUserProperties.putString("string-user-property", "value1"); + solMsgUserProperties.putObject("bigInteger-user-property", new BigInteger("123456")); + solMsgUserProperties.putByte("byte-user-property", "A".getBytes()[0]); + solMsgUserProperties.putBytes("bytes-user-property", "Hello".getBytes()); + solMsgUserProperties.putByteArray("byteArray-user-property", + new ByteArray("Hello World".getBytes())); + solMsgUserProperties.putDestination("topic-user-property", + JCSMPFactory.onlyInstance().createTopic("testTopic")); + solMsgUserProperties.putDestination("queue-user-property", + JCSMPFactory.onlyInstance().createQueue("testQueue")); + + final BytesXMLMessage message = mock(TextMessage.class); + when(message.getProperties()).thenReturn(solMsgUserProperties); + + final ConnectHeaders kafkaHeaders = messageProcessor.userPropertiesToKafkaHeaders(message); + assertThat(kafkaHeaders.size(), equalTo(message.getProperties().size())); + + kafkaHeaders.iterator().forEachRemaining( + header -> assertThat(header.key(), solMsgUserProperties.containsKey(header.key()))); + } + + @Test + void testUserPropertiesMappingWhenGivenPropertyOfUnsupportedTypes() + throws SDTException { + final SDTMap solMsgUserProperties = JCSMPFactory.onlyInstance().createMap(); + solMsgUserProperties.putMap("map-user-property", JCSMPFactory.onlyInstance().createMap()); + solMsgUserProperties.putStream("stream-user-property", JCSMPFactory.onlyInstance().createStream()); + solMsgUserProperties.putMessage("raw-message-user-property", + new RawSMFMessageImpl(new ByteArray("hello".getBytes()))); + + final BytesXMLMessage message = mock(TextMessage.class); + when(message.getProperties()).thenReturn(solMsgUserProperties); + + ConnectHeaders kafkaHeaders = messageProcessor.userPropertiesToKafkaHeaders(message); + assertThat(solMsgUserProperties.size(), equalTo(message.getProperties().size())); + assertThat(kafkaHeaders.size(), equalTo(0)); + } + + @Test + void testSolaceStandardPropertiesMappingGivenSolaceMessage() { + final BytesXMLMessage message = mock(TextMessage.class); + when(message.getApplicationMessageId()).thenReturn(UUID.randomUUID().toString()); + when(message.getApplicationMessageType()).thenReturn("testMessageType"); + when(message.getCorrelationId()).thenReturn(UUID.randomUUID().toString()); + when(message.getCos()).thenReturn(User_Cos.USER_COS_1); + when(message.getDestination()).thenReturn(JCSMPFactory.onlyInstance().createQueue("testQueue")); + when(message.getDeliveryMode()).thenReturn(DeliveryMode.PERSISTENT); + when(message.getReplyTo()).thenReturn(JCSMPFactory.onlyInstance().createQueue("testQueue")); + + ConnectHeaders kafkaHeaders = messageProcessor.solacePropertiesToKafkaHeaders(message); + assertThat("kafkaHeaders should not be empty", !kafkaHeaders.isEmpty()); + assertThat(message.getApplicationMessageId(), + equalTo(kafkaHeaders.lastWithName(SOL_SH_APPLICATION_MESSAGE_ID).value())); + assertThat(message.getApplicationMessageType(), + equalTo(kafkaHeaders.lastWithName(SOL_SH_APPLICATION_MESSAGE_TYPE).value())); + assertThat(message.getCorrelationId(), + equalTo(kafkaHeaders.lastWithName(SOL_SH_CORRELATION_ID).value())); + assertThat(message.getCos().value(), + equalTo(kafkaHeaders.lastWithName(SOL_SH_COS).value())); + assertThat(message.getDestination().getName(), + equalTo(kafkaHeaders.lastWithName(SOL_SH_DESTINATION).value())); + assertThat(message.getDeliveryMode().name(), + equalTo(kafkaHeaders.lastWithName(SOL_SH_DELIVERY_MODE).value())); + + assertThat(message.getReplyTo().getName(), + equalTo(kafkaHeaders.lastWithName(SOL_SH_REPLY_TO_DESTINATION).value())); + assertThat("queue", + equalTo(kafkaHeaders.lastWithName(SOL_SH_REPLY_TO_DESTINATION_TYPE).value())); + } +} \ No newline at end of file