Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAGO-67107 Added support for forwarding Solace message user properties to Kafka record headers #71

Merged
merged 4 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ In this case the IP address is one of the nodes running the distributed mode wor
{
"class": "com.solace.connector.kafka.connect.source.SolaceSourceConnector",
"type": "source",
"version": "3.0.1"
"version": "3.1.0"
},
```

Expand Down Expand Up @@ -374,6 +374,14 @@ For reference, this project includes two examples which you can use as starting
* [SolSampleSimpleMessageProcessor](/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolSampleSimpleMessageProcessor.java)
* [SolaceSampleKeyedMessageProcessor](/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolaceSampleKeyedMessageProcessor.java)

Above two processors by default won't map/forward the Solace message user properties and Solace standard properties. If you want to map/forward them as Kafka record headers set below two properties to `true` in connector configuration. Refer sample [here](/etc/solace_source_properties.json) and [Parameters section](#parameters) section for details.

```
sol.message_processor.map_user_properties=true
sol.message_processor.map_solace_standard_properties=true
```


Once you've built the jar file for your custom message processor project, place it into the same directory as this connector, and update the connector's `sol.message_processor_class` config to point to the class of your new message processor.

More information on Kafka source connector development can be found here:
Expand Down
7 changes: 7 additions & 0 deletions etc/solace_source.properties
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ sol.message_processor_class=com.solace.connector.kafka.connect.source.msgprocess
# If enabled, messages that throw message processor errors will be discarded.
#sol.message_processor.error.ignore=false

# If enabled, maps/forwards the user properties Map from Solace message to Kafka record headers
#sol.message_processor.map_user_properties=false

# If enabled, maps/forwards the Solace message standard properties (e.g. correlationId, applicationMessageId, redelivered, dmqEligible, COS etc) to Kafka record headers
# The Solace standard properties names will be prefixed with "solace_" (e.g. correlationId as solace_correlationId) to Kafka record headers
#sol.message_processor.map_solace_standard_properties=false

# When using SolaceSampleKeyedMessageProcessor, defines which part of a
# PubSub+ message shall be converted to a Kafka record key
# Allowable values include: NONE, DESTINATION, CORRELATION_ID, CORRELATION_ID_AS_BYTES
Expand Down
2 changes: 2 additions & 0 deletions etc/solace_source_properties.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"sol.vpn_name": "default",
"sol.topics": "sourcetest",
"sol.message_processor_class": "com.solace.connector.kafka.connect.source.msgprocessors.SolSampleSimpleMessageProcessor",
"sol.message_processor.map_user_properties": "false",
"sol.message_processor.map_solace_standard_properties": "false",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter" }
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
group=com.solace.connector.kafka.connect
version=3.0.1
version=3.1.0

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,165 @@

package com.solace.connector.kafka.connect.source;

import com.solacesystems.common.util.ByteArray;
import com.solacesystems.jcsmp.BytesXMLMessage;

import com.solacesystems.jcsmp.Destination;
import com.solacesystems.jcsmp.SDTException;
import com.solacesystems.jcsmp.SDTMap;
import com.solacesystems.jcsmp.Topic;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Date;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface SolMessageProcessorIF {

Logger log = LoggerFactory.getLogger(SolMessageProcessorIF.class);

SolMessageProcessorIF process(String skey, BytesXMLMessage message);

SourceRecord[] getRecords(String kafkaTopic);

default ConnectHeaders userPropertiesToKafkaHeaders(BytesXMLMessage message) {
final ConnectHeaders headers = new ConnectHeaders();
final SDTMap userProperties = message.getProperties();

if (userProperties != null) {
for (String key : userProperties.keySet()) {
try {
Object value = userProperties.get(key);
if (value == null) {
headers.add(key, SchemaAndValue.NULL);
} else if (value instanceof String) {
headers.addString(key, (String) value);
} else if (value instanceof Boolean) {
headers.addBoolean(key, (Boolean) value);
} else if (value instanceof byte[]) {
headers.addBytes(key, (byte[]) value);
} else if (value instanceof ByteArray) {
headers.addBytes(key, ((ByteArray) value).asBytes());
} else if (value instanceof Byte) {
headers.addByte(key, (byte) value);
} else if (value instanceof Integer) {
headers.addInt(key, (Integer) value);
} else if (value instanceof Short) {
headers.addShort(key, (Short) value);
} else if (value instanceof Long) {
headers.addLong(key, (Long) value);
} else if (value instanceof Double) {
headers.addDouble(key, (Double) value);
} else if (value instanceof Float) {
headers.addFloat(key, (Float) value);
} else if (value instanceof BigDecimal) {
headers.addDecimal(key, (BigDecimal) value);
} else if (value instanceof BigInteger) {
headers.addDecimal(key, new BigDecimal((BigInteger) value));
} else if (value instanceof Date) {
headers.addDate(key, (Date) value);
} else if (value instanceof Character) {
headers.addString(key, ((Character) value).toString());
} else if (value instanceof Destination) {
if (log.isTraceEnabled()) {
log.trace(
String.format("Extracting destination name from user property %s", key));
}
String destinationName = ((Destination) value).getName();
headers.addString(key, destinationName);
} else {
if (log.isDebugEnabled()) {
log.debug(String.format("Ignoring user property with key [%s] and type [%s]", key,
value.getClass().getName()));
}
}
} catch (SDTException e) {
log.error(String.format("Ignoring user property with key [%s].", key), e);
}
}
}

return headers;
}

default ConnectHeaders solacePropertiesToKafkaHeaders(BytesXMLMessage msg) {
final ConnectHeaders headers = new ConnectHeaders();
if (msg.getApplicationMessageId() != null) {
headers.addString(SolaceSourceConstants.SOL_SH_APPLICATION_MESSAGE_ID,
msg.getApplicationMessageId());
}

if (msg.getApplicationMessageType() != null) {
headers.addString(SolaceSourceConstants.SOL_SH_APPLICATION_MESSAGE_TYPE,
msg.getApplicationMessageType());
}

if (msg.getCorrelationId() != null) {
headers.addString(SolaceSourceConstants.SOL_SH_CORRELATION_ID, msg.getCorrelationId());
}

if (msg.getCos() != null) {
headers.addInt(SolaceSourceConstants.SOL_SH_COS, msg.getCos().value());
}

if (msg.getDeliveryMode() != null) {
headers.addString(SolaceSourceConstants.SOL_SH_DELIVERY_MODE, msg.getDeliveryMode().name());
}

if (msg.getDestination() != null) {
headers.addString(SolaceSourceConstants.SOL_SH_DESTINATION, msg.getDestination().getName());
}

if (msg.getReplyTo() != null) {
Destination replyToDestination = msg.getReplyTo();
headers.addString(SolaceSourceConstants.SOL_SH_REPLY_TO_DESTINATION,
replyToDestination.getName());
String destinationType = replyToDestination instanceof Topic ? "topic" : "queue";
headers.addString(SolaceSourceConstants.SOL_SH_REPLY_TO_DESTINATION_TYPE,
destinationType);
}

if (msg.getSenderId() != null) {
headers.addString(SolaceSourceConstants.SOL_SH_SENDER_ID, msg.getSenderId());
}

if (msg.getSenderTimestamp() != null) {
headers.addLong(SolaceSourceConstants.SOL_SH_SENDER_TIMESTAMP, msg.getSenderTimestamp());
}

if (msg.getTimeToLive() > 0) {
headers.addLong(SolaceSourceConstants.SOL_SH_TIME_TO_LIVE, msg.getTimeToLive());
}

if (msg.getExpiration() > 0) {
headers.addLong(SolaceSourceConstants.SOL_SH_EXPIRATION, msg.getExpiration());
}

if (msg.getHTTPContentEncoding() != null) {
headers.addString(SolaceSourceConstants.SOL_SH_HTTP_CONTENT_ENCODING,
msg.getHTTPContentEncoding());
}

if (msg.getHTTPContentType() != null) {
headers.addString(SolaceSourceConstants.SOL_SH_HTTP_CONTENT_TYPE,
msg.getHTTPContentType());
}

if (msg.getSequenceNumber() != null) {
headers.addLong(SolaceSourceConstants.SOL_SH_SEQUENCE_NUMBER, msg.getSequenceNumber());
}

headers.addInt(SolaceSourceConstants.SOL_SH_PRIORITY, msg.getPriority());
headers.addLong(SolaceSourceConstants.SOL_SH_RECEIVE_TIMESTAMP, msg.getReceiveTimestamp());

headers.addBoolean(SolaceSourceConstants.SOL_SH_REDELIVERED, msg.getRedelivered());
headers.addBoolean(SolaceSourceConstants.SOL_SH_DISCARD_INDICATION, msg.getDiscardIndication());
headers.addBoolean(SolaceSourceConstants.SOL_SH_IS_DMQ_ELIGIBLE, msg.isDMQEligible());
headers.addBoolean(SolaceSourceConstants.SOL_SH_IS_ELIDING_ELIGIBLE, msg.isElidingEligible());
headers.addBoolean(SolaceSourceConstants.SOL_SH_IS_REPLY_MESSAGE, msg.isReplyMessage());

return headers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class SolaceSourceConnectorConfig extends AbstractConfig {
* Constructor to create Solace Configuration details for Source Connector.
* @param properties the configuration properties
*/
public SolaceSourceConnectorConfig(Map<String, String> properties) {
public SolaceSourceConnectorConfig(Map<String, String> properties) {
super(config, properties);

log.info("==================Initialize Connector properties");
Expand Down Expand Up @@ -254,7 +254,11 @@ public static ConfigDef solaceConfigDef() {
.define(SolaceSourceConstants.SOL_KERBEROS_LOGIN_CONFIG, Type.STRING, "", Importance.LOW,
"Location of the Kerberos Login Configuration File")
.define(SolaceSourceConstants.SOL_KAFKA_MESSAGE_KEY, Type.STRING, "NONE", Importance.MEDIUM,
"This propert determines if a Kafka key record is created and the key to be used");
"This property determines if a Kafka key record is created and the key to be used")
.define(SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_MAP_USER_PROPERTIES, Type.BOOLEAN, false, Importance.MEDIUM,
"This property determines if Solace message user properties will be mapped to Kafka record headers")
.define(SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_MAP_SOLACE_STANDARD_PROPERTIES, Type.BOOLEAN, false, Importance.MEDIUM,
"This property determines if Solace message standard properties will be mapped to Kafka record headers");


}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,5 +138,33 @@ public class SolaceSourceConstants {
public static final String SOL_KERBEROS_LOGIN_CONFIG = "sol.kerberos.login.conf";
public static final String SOL_KERBEROS_KRB5_CONFIG = "sol.kerberos.krb5.conf";


// Medium Importance Solace Message processor
public static final String SOL_MESSAGE_PROCESSOR_MAP_USER_PROPERTIES = "sol.message_processor.map_user_properties";
public static final String SOL_MESSAGE_PROCESSOR_MAP_SOLACE_STANDARD_PROPERTIES = "sol.message_processor.map_solace_standard_properties";


//All SOL_SH prefixed constants are Solace Message Standard Headers.
public static final String SOL_SH_APPLICATION_MESSAGE_ID = "solace_applicationMessageID";
public static final String SOL_SH_APPLICATION_MESSAGE_TYPE = "solace_applicationMessageType";
public static final String SOL_SH_CORRELATION_ID = "solace_correlationID";
public static final String SOL_SH_COS = "solace_cos";
public static final String SOL_SH_DELIVERY_MODE = "solace_deliveryMode";
public static final String SOL_SH_DESTINATION = "solace_destination";
public static final String SOL_SH_DISCARD_INDICATION = "solace_discardIndication";
public static final String SOL_SH_EXPIRATION = "solace_expiration";
public static final String SOL_SH_PRIORITY = "solace_priority";
public static final String SOL_SH_RECEIVE_TIMESTAMP = "solace_receiveTimestamp";
public static final String SOL_SH_REDELIVERED = "solace_redelivered";
public static final String SOL_SH_REPLY_TO = "solace_replyTo";
public static final String SOL_SH_REPLY_TO_DESTINATION_TYPE = "solace_replyToDestinationType";
public static final String SOL_SH_REPLY_TO_DESTINATION = "solace_replyToDestination";
public static final String SOL_SH_SENDER_ID = "solace_senderID";
public static final String SOL_SH_SENDER_TIMESTAMP = "solace_senderTimestamp";
public static final String SOL_SH_TIME_TO_LIVE = "solace_timeToLive";
public static final String SOL_SH_IS_DMQ_ELIGIBLE = "solace_DMQEligible";
public static final String SOL_SH_IS_ELIDING_ELIGIBLE = "solace_elidingEligible";
public static final String SOL_SH_IS_REPLY_MESSAGE = "solace_replyMessage";
public static final String SOL_SH_HTTP_CONTENT_ENCODING = "solace_httpContentEncoding";
public static final String SOL_SH_HTTP_CONTENT_TYPE = "solace_httpContentType";
public static final String SOL_SH_SEQUENCE_NUMBER = "solace_sequenceNumber";
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,60 +19,102 @@

package com.solace.connector.kafka.connect.source.msgprocessors;

import static com.solace.connector.kafka.connect.source.SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_MAP_SOLACE_STANDARD_PROPERTIES;
import static com.solace.connector.kafka.connect.source.SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_MAP_USER_PROPERTIES;
import com.solace.connector.kafka.connect.source.SolMessageProcessorIF;
import com.solacesystems.jcsmp.BytesXMLMessage;
//import com.solacesystems.jcsmp.DeliveryMode;
import com.solacesystems.jcsmp.TextMessage;

import java.nio.charset.Charset;

import java.nio.charset.StandardCharsets;

import java.util.LinkedList;
import java.util.Map;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SolSampleSimpleMessageProcessor implements SolMessageProcessorIF {
public class SolSampleSimpleMessageProcessor implements SolMessageProcessorIF, Configurable {

private static final Logger log = LoggerFactory.getLogger(SolSampleSimpleMessageProcessor.class);
private Object smsg;
private String skey;
private Object sdestination;
private byte[] messageOut;
private LinkedList<Header> headers = new LinkedList<>();

private Map<String, ?> configs;
private boolean mapUserProperties;
private boolean mapSolaceStandardProperties;

@Override
public void configure(Map<String, ?> configs) {
this.configs = configs;
this.mapUserProperties = getBooleanConfigProperty(SOL_MESSAGE_PROCESSOR_MAP_USER_PROPERTIES);
this.mapSolaceStandardProperties = getBooleanConfigProperty(
SOL_MESSAGE_PROCESSOR_MAP_SOLACE_STANDARD_PROPERTIES);
}

@Override
public SolMessageProcessorIF process(String skey, BytesXMLMessage msg) {
this.smsg = msg;
this.headers.clear();

if (log.isDebugEnabled()) {
log.debug("{} received.", msg.getClass().getName());
}
if (msg instanceof TextMessage) {
log.debug("Text Message received {}", ((TextMessage) msg).getText());
String smsg = ((TextMessage) msg).getText();
messageOut = smsg.getBytes(StandardCharsets.UTF_8);
} else {
log.debug("Message payload: {}", new String(msg.getBytes(), Charset.defaultCharset()));
if (msg.getBytes().length != 0) { // Binary XML pay load
messageOut = msg.getBytes();
} else { // Binary attachment pay load
messageOut = msg.getAttachmentByteBuffer().array();
}
}
log.debug("Message Dump:{}", msg.dump());

this.sdestination = msg.getDestination().getName();
log.debug("processing data for destination: {}; with message {}, with Kafka topic key of: {}",
(String) this.sdestination, msg, this.skey);
if (log.isDebugEnabled()) {
log.debug("processing data for destination: {}; with Kafka topic key of: {}",
this.sdestination, this.skey);
}
this.skey = skey;
this.smsg = messageOut;

if (mapUserProperties) {
ConnectHeaders userProps = userPropertiesToKafkaHeaders(msg);
userProps.iterator().forEachRemaining(headers::add);
}

if (mapSolaceStandardProperties) {
ConnectHeaders solaceProps = solacePropertiesToKafkaHeaders(msg);
solaceProps.iterator().forEachRemaining(headers::add);
}

return this;
}

@Override
public SourceRecord[] getRecords(String kafkaTopic) {
return new SourceRecord[] {
new SourceRecord(null, null, kafkaTopic, null, null,
null, Schema.BYTES_SCHEMA, smsg) };

return new SourceRecord[]{
new SourceRecord(null, null, kafkaTopic, null, null,
null, Schema.BYTES_SCHEMA, smsg, (Long) null, headers)};
}

private boolean getBooleanConfigProperty(String name) {
if (this.configs != null && this.configs.containsKey(name)) {
final Object value = this.configs.get(name);
if (value instanceof String) {
return Boolean.parseBoolean((String) value);
} else if (value instanceof Boolean) {
return (boolean) value;
} else {
log.error("The value of property {} should be of type boolean or string.", name);
}
}
return false;
}
}
Loading
Loading