Skip to content

Commit

Permalink
DATAGO-67107 Added support for mapping/forwarding Solace message user…
Browse files Browse the repository at this point in the history
… properties to Kafka record headers

- Added two new configuration parameters: sol.message_processor.map_user_properties and
sol.message_processor.map_solace_standard_properties
- Updated tests, sample configuration and docs
  • Loading branch information
mayur-solace committed Feb 2, 2024
1 parent 7de1f3c commit cff498f
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.solacesystems.common.util.ByteArray;
import com.solacesystems.jcsmp.BytesMessage;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.Message;
Expand All @@ -32,7 +33,6 @@
import com.solacesystems.jcsmp.impl.AbstractDestination;
import com.solacesystems.jcsmp.impl.QueueImpl;
import com.solacesystems.jcsmp.impl.TopicImpl;
import com.solacesystems.jcsmp.impl.sdt.MapImpl;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.time.Duration;
Expand Down Expand Up @@ -668,7 +668,7 @@ void testFailPubSubConnection(KafkaContext kafkaContext) {
}

private SDTMap getTestUserProperties() throws SDTException {
final SDTMap solMsgUserProperties = new MapImpl();
final SDTMap solMsgUserProperties = JCSMPFactory.onlyInstance().createMap();
solMsgUserProperties.putObject("null-value-user-property", null);
solMsgUserProperties.putBoolean("boolean-user-property", true);
solMsgUserProperties.putCharacter("char-user-property", 'C');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,57 +44,57 @@ public interface SolMessageProcessorIF {

default ConnectHeaders userPropertiesToKafkaHeaders(BytesXMLMessage message) {
final ConnectHeaders headers = new ConnectHeaders();
final SDTMap msgUserProperties = message.getProperties();
final SDTMap userProperties = message.getProperties();

if (msgUserProperties != null) {
for (String propKey : msgUserProperties.keySet()) {
if (userProperties != null) {
for (String key : userProperties.keySet()) {
try {
Object propValue = msgUserProperties.get(propKey);
if (propValue == null) {
headers.add(propKey, SchemaAndValue.NULL);
} else if (propValue instanceof String) {
headers.addString(propKey, (String) propValue);
} else if (propValue instanceof Boolean) {
headers.addBoolean(propKey, (Boolean) propValue);
} else if (propValue instanceof byte[]) {
headers.addBytes(propKey, (byte[]) propValue);
} else if (propValue instanceof ByteArray) {
headers.addBytes(propKey, ((ByteArray) propValue).asBytes());
} else if (propValue instanceof Byte) {
headers.addByte(propKey, (byte) propValue);
} else if (propValue instanceof Integer) {
headers.addInt(propKey, (Integer) propValue);
} else if (propValue instanceof Short) {
headers.addShort(propKey, (Short) propValue);
} else if (propValue instanceof Long) {
headers.addLong(propKey, (Long) propValue);
} else if (propValue instanceof Double) {
headers.addDouble(propKey, (Double) propValue);
} else if (propValue instanceof Float) {
headers.addFloat(propKey, (Float) propValue);
} else if (propValue instanceof BigDecimal) {
headers.addDecimal(propKey, (BigDecimal) propValue);
} else if (propValue instanceof BigInteger) {
headers.addDecimal(propKey, new BigDecimal((BigInteger) propValue));
} else if (propValue instanceof Date) {
headers.addDate(propKey, (Date) propValue);
} else if (propValue instanceof Character) {
headers.addString(propKey, ((Character) propValue).toString());
} else if (propValue instanceof Destination) {
Object value = userProperties.get(key);
if (value == null) {
headers.add(key, SchemaAndValue.NULL);
} else if (value instanceof String) {
headers.addString(key, (String) value);
} else if (value instanceof Boolean) {
headers.addBoolean(key, (Boolean) value);
} else if (value instanceof byte[]) {
headers.addBytes(key, (byte[]) value);
} else if (value instanceof ByteArray) {
headers.addBytes(key, ((ByteArray) value).asBytes());
} else if (value instanceof Byte) {
headers.addByte(key, (byte) value);
} else if (value instanceof Integer) {
headers.addInt(key, (Integer) value);
} else if (value instanceof Short) {
headers.addShort(key, (Short) value);
} else if (value instanceof Long) {
headers.addLong(key, (Long) value);
} else if (value instanceof Double) {
headers.addDouble(key, (Double) value);
} else if (value instanceof Float) {
headers.addFloat(key, (Float) value);
} else if (value instanceof BigDecimal) {
headers.addDecimal(key, (BigDecimal) value);
} else if (value instanceof BigInteger) {
headers.addDecimal(key, new BigDecimal((BigInteger) value));
} else if (value instanceof Date) {
headers.addDate(key, (Date) value);
} else if (value instanceof Character) {
headers.addString(key, ((Character) value).toString());
} else if (value instanceof Destination) {
if (log.isTraceEnabled()) {
log.trace(
String.format("Extracting destination name from user property %s", propKey));
String.format("Extracting destination name from user property %s", key));
}
String destinationName = ((Destination) propValue).getName();
headers.addString(propKey, destinationName);
String destinationName = ((Destination) value).getName();
headers.addString(key, destinationName);
} else {
if (log.isInfoEnabled()) {
log.info(String.format("Ignoring user property with key [%s] and type [%s]", propKey,
propValue.getClass().getName()));
if (log.isDebugEnabled()) {
log.debug(String.format("Ignoring user property with key [%s] and type [%s]", key,
value.getClass().getName()));
}
}
} catch (SDTException e) {
log.error(String.format("Ignoring user property with key [%s].", propKey), e);
log.error(String.format("Ignoring user property with key [%s].", key), e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.solace.connector.kafka.connect.source.SolMessageProcessorIF;
import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.TextMessage;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.Map;
Expand Down Expand Up @@ -61,16 +60,14 @@ public void configure(Map<String, ?> configs) {
public SolMessageProcessorIF process(String skey, BytesXMLMessage msg) {
this.smsg = msg;
this.headers.clear();

if (log.isDebugEnabled()) {
log.debug("{} received.", msg.getClass().getName());
}
if (msg instanceof TextMessage) {
if (log.isDebugEnabled()) {
log.debug("Text Message received {}", ((TextMessage) msg).getText());
}
String smsg = ((TextMessage) msg).getText();
messageOut = smsg.getBytes(StandardCharsets.UTF_8);
} else {
if (log.isDebugEnabled()) {
log.debug("Message payload: {}", new String(msg.getBytes(), Charset.defaultCharset()));
}
if (msg.getBytes().length != 0) { // Binary XML pay load
messageOut = msg.getBytes();
} else { // Binary attachment pay load
Expand All @@ -80,8 +77,8 @@ public SolMessageProcessorIF process(String skey, BytesXMLMessage msg) {

this.sdestination = msg.getDestination().getName();
if (log.isDebugEnabled()) {
log.debug("processing data for destination: {}; with message {}, with Kafka topic key of: {}",
(String) this.sdestination, msg, this.skey);
log.debug("processing data for destination: {}; with Kafka topic key of: {}",
this.sdestination, this.skey);
}
this.skey = skey;
this.smsg = messageOut;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.solace.connector.kafka.connect.source.SolMessageProcessorIF;
import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.TextMessage;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.Map;
Expand All @@ -42,6 +41,7 @@ public class SolaceSampleKeyedMessageProcessor implements SolMessageProcessorIF,
private static final Logger log
= LoggerFactory.getLogger(SolaceSampleKeyedMessageProcessor.class);
private Object smsg;
private Object sdestination;
private byte[] messageOut;
private String skey;
private BytesXMLMessage msg;
Expand Down Expand Up @@ -72,26 +72,24 @@ public SolMessageProcessorIF process(String skey, BytesXMLMessage msg) {
this.headers.clear();
this.skey = skey.toUpperCase();

if (log.isDebugEnabled()) {
log.debug("{} received.", msg.getClass().getName());
}
if (msg instanceof TextMessage) {
if (log.isDebugEnabled()) {
log.debug("Text Message received {}", ((TextMessage) msg).getText());
}
String smsg = ((TextMessage) msg).getText();
messageOut = smsg.getBytes(StandardCharsets.UTF_8);
} else {
if (log.isDebugEnabled()) {
log.debug("Message payload: {}", new String(msg.getBytes(), Charset.defaultCharset()));
}
if (msg.getBytes().length != 0) { // Binary XML pay load
messageOut = msg.getBytes();
} else { // Binary attachment pay load
messageOut = msg.getAttachmentByteBuffer().array();
}

}

this.sdestination = msg.getDestination().getName();
if (log.isDebugEnabled()) {
log.debug("processing data for Kafka topic Key: {}; with message {}", skey, msg);
log.debug("processing data for destination: {}; with Kafka topic key of: {}",
this.sdestination, this.skey);
}

this.smsg = messageOut;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
import com.solacesystems.common.util.ByteArray;
import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.DeliveryMode;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.SDTException;
import com.solacesystems.jcsmp.SDTMap;
import com.solacesystems.jcsmp.TextMessage;
import com.solacesystems.jcsmp.User_Cos;
import com.solacesystems.jcsmp.impl.QueueImpl;
import com.solacesystems.jcsmp.impl.RawSMFMessageImpl;
import com.solacesystems.jcsmp.impl.TopicImpl;
import com.solacesystems.jcsmp.impl.sdt.MapImpl;
import com.solacesystems.jcsmp.impl.sdt.StreamImpl;
import java.math.BigInteger;
import java.util.UUID;
import org.apache.kafka.connect.header.ConnectHeaders;
Expand All @@ -51,7 +50,7 @@ void testUserPropertiesMappingGivenNullUserPropertyMap() {

@Test
void testUserPropertiesMappingGiveEmptyUserPropertyMap() {
final SDTMap solMsgUserProperties = new MapImpl();
final SDTMap solMsgUserProperties = JCSMPFactory.onlyInstance().createMap();
final BytesXMLMessage message = mock(TextMessage.class);
when(message.getProperties()).thenReturn(solMsgUserProperties);

Expand All @@ -61,7 +60,7 @@ void testUserPropertiesMappingGiveEmptyUserPropertyMap() {

@Test
void testUserPropertiesMappingForGivenUserPropertyMap() throws SDTException {
final SDTMap solMsgUserProperties = new MapImpl();
final SDTMap solMsgUserProperties = JCSMPFactory.onlyInstance().createMap();
solMsgUserProperties.putObject("null-value-user-property", null);
solMsgUserProperties.putBoolean("boolean-user-property", true);
solMsgUserProperties.putCharacter("char-user-property", 'C');
Expand Down Expand Up @@ -94,9 +93,9 @@ void testUserPropertiesMappingForGivenUserPropertyMap() throws SDTException {
@Test
void testUserPropertiesMappingWhenGivenPropertyOfUnsupportedTypes()
throws SDTException {
final SDTMap solMsgUserProperties = new MapImpl();
solMsgUserProperties.putMap("map-user-property", new MapImpl());
solMsgUserProperties.putStream("stream-user-property", new StreamImpl());
final SDTMap solMsgUserProperties = JCSMPFactory.onlyInstance().createMap();
solMsgUserProperties.putMap("map-user-property", JCSMPFactory.onlyInstance().createMap());
solMsgUserProperties.putStream("stream-user-property", JCSMPFactory.onlyInstance().createStream());
solMsgUserProperties.putMessage("raw-message-user-property",
new RawSMFMessageImpl(new ByteArray("hello".getBytes())));

Expand Down

0 comments on commit cff498f

Please sign in to comment.