From 85e9d4e827ae4d699cec4fe130c7f96117b5e324 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bal=C3=A1zs=20Czoma?= Date: Thu, 24 Sep 2020 09:53:48 -0400 Subject: [PATCH] Flush fix (#22) * Kafka flush() is now used when a manual commit is required * Uprev'd version to 2.0.2 --- .travis.yml | 16 ++++++++++++++++ gradle.properties | 2 +- .../kafka/connect/sink/SolaceSinkSender.java | 19 +++++++++++++++---- .../kafka/connect/sink/SolaceSinkTask.java | 2 +- .../kafka/connect/sink/VersionUtil.java | 2 +- 5 files changed, 34 insertions(+), 7 deletions(-) diff --git a/.travis.yml b/.travis.yml index 2f7fb1a..2f1fe2b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -27,4 +27,20 @@ after_success: git remote add origin-pages https://${GH_TOKEN}@github.com/SolaceProducts/pubsubplus-connector-kafka-sink.git > /dev/null 2>&1; git push --quiet --set-upstream origin-pages gh-pages; echo "Updated and pushed GH pages!"; + elif [ "$TRAVIS_PULL_REQUEST" = "false" ] && ! [ $(echo $TRAVIS_REPO_SLUG | grep "SolaceProducts") ]; then + git config --global user.email "travis@travis-ci.org"; + git config --global user.name "travis-ci"; + mkdir gh-pages; # Now update gh-pages + git clone --quiet --branch=gh-pages https://${GH_TOKEN}@github.com/$TRAVIS_REPO_SLUG gh-pages > /dev/null 2>&1; + rm gh-pages/downloads/pubsubplus-connector-kafka-sink* + mv build/distributions/pubsubplus-connector-kafka-sink* gh-pages/downloads + cd gh-pages; + pushd downloads + cp index.template index.html; FILENAME=`find . | grep *.zip | cut -d'/' -f2 | sed 's/.\{4\}$//'`; sed -i "s/CONNECTOR_NAME/$FILENAME/g" index.html; + popd; + git add -f .; + git commit -m "Latest connector distribution on successful travis build $TRAVIS_BUILD_NUMBER auto-pushed to gh-pages"; + git remote add origin-pages https://${GH_TOKEN}@github.com/$TRAVIS_REPO_SLUG.git > /dev/null 2>&1; + git push --quiet --set-upstream origin-pages gh-pages; + echo "Updated and pushed GH pages!"; fi diff --git a/gradle.properties b/gradle.properties index 516314f..fb7cb53 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=2.0.1 \ No newline at end of file +version=2.0.2 \ No newline at end of file diff --git a/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkSender.java b/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkSender.java index 1f8fb17..af103f1 100644 --- a/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkSender.java +++ b/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkSender.java @@ -31,9 +31,13 @@ import com.solacesystems.jcsmp.Topic; import com.solacesystems.jcsmp.XMLMessageProducer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,19 +57,24 @@ public class SolaceSinkSender { private SolRecordProcessorIF processor; private String kafkaKey; private AtomicInteger txMsgCounter = new AtomicInteger(); - + private SolaceSinkTask sinkTask; + private Map offsets + = new HashMap(); + /** * Class that sends Solace Messages from Kafka Records. * @param sconfig JCSMP Configuration * @param sessionHandler SolSessionHandler - * @param useTxforQueue + * @param useTxforQueue + * @param sinkTask Connector Sink Task * @throws JCSMPException */ public SolaceSinkSender(SolaceSinkConnectorConfig sconfig, SolSessionHandler sessionHandler, - boolean useTxforQueue) throws JCSMPException { + boolean useTxforQueue, SolaceSinkTask sinkTask) throws JCSMPException { this.sconfig = sconfig; this.sessionHandler = sessionHandler; this.useTxforQueue = useTxforQueue; + this.sinkTask = sinkTask; kafkaKey = sconfig.getString(SolaceSinkConstants.SOL_KAFKA_MESSAGE_KEY); topicProducer = sessionHandler.getSession().getMessageProducer(new SolStreamingMessageCallbackHandler()); cprocessor = (this.sconfig.getClass(SolaceSinkConstants.SOL_RECORD_PROCESSOR)); @@ -118,6 +127,8 @@ public void setupDestinationQueue() throws JCSMPException { public void sendRecord(SinkRecord record) { try { message = processor.processRecord(kafkaKey, record); + offsets.put(new TopicPartition(record.topic(), record.kafkaPartition()), + new OffsetAndMetadata(record.kafkaOffset())); log.trace("================ Processed record details, topic: {}, Partition: {}, " + "Offset: {}", record.topic(), record.kafkaPartition(), record.kafkaOffset()); @@ -189,7 +200,7 @@ public void sendRecord(SinkRecord record) { // Solace limits transaction size to 255 messages so need to force commit if ( useTxforQueue && txMsgCounter.get() > sconfig.getInt(SolaceSinkConstants.SOL_QUEUE_MESSAGES_AUTOFLUSH_SIZE)-1 ) { log.debug("================ Queue transaction autoflush size reached, flushing offsets from connector"); - commit(); + sinkTask.flush(offsets); } } diff --git a/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkTask.java b/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkTask.java index 025f8e0..eff2b5f 100644 --- a/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkTask.java +++ b/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkTask.java @@ -74,7 +74,7 @@ public void start(Map props) { } try { - solSender = new SolaceSinkSender(connectorConfig, solSessionHandler, useTxforQueue); + solSender = new SolaceSinkSender(connectorConfig, solSessionHandler, useTxforQueue, this); if (connectorConfig.getString(SolaceSinkConstants.SOL_TOPICS) != null) { solSender.setupDestinationTopics(); } diff --git a/src/main/java/com/solace/connector/kafka/connect/sink/VersionUtil.java b/src/main/java/com/solace/connector/kafka/connect/sink/VersionUtil.java index 0644387..54fda38 100644 --- a/src/main/java/com/solace/connector/kafka/connect/sink/VersionUtil.java +++ b/src/main/java/com/solace/connector/kafka/connect/sink/VersionUtil.java @@ -7,7 +7,7 @@ public class VersionUtil { */ public static String getVersion() { - return "2.0.1"; + return "2.0.2"; } }