Skip to content

Commit

Permalink
Flush fix (#22)
Browse files Browse the repository at this point in the history
* Kafka flush() is now used when a manual commit is required

* Uprev'd version to 2.0.2
  • Loading branch information
bczoma authored Sep 24, 2020
1 parent 5ecbea4 commit 85e9d4e
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 7 deletions.
16 changes: 16 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,20 @@ after_success:
git remote add origin-pages https://${GH_TOKEN}@github.com/SolaceProducts/pubsubplus-connector-kafka-sink.git > /dev/null 2>&1;
git push --quiet --set-upstream origin-pages gh-pages;
echo "Updated and pushed GH pages!";
elif [ "$TRAVIS_PULL_REQUEST" = "false" ] && ! [ $(echo $TRAVIS_REPO_SLUG | grep "SolaceProducts") ]; then
git config --global user.email "[email protected]";
git config --global user.name "travis-ci";
mkdir gh-pages; # Now update gh-pages
git clone --quiet --branch=gh-pages https://${GH_TOKEN}@github.com/$TRAVIS_REPO_SLUG gh-pages > /dev/null 2>&1;
rm gh-pages/downloads/pubsubplus-connector-kafka-sink*
mv build/distributions/pubsubplus-connector-kafka-sink* gh-pages/downloads
cd gh-pages;
pushd downloads
cp index.template index.html; FILENAME=`find . | grep *.zip | cut -d'/' -f2 | sed 's/.\{4\}$//'`; sed -i "s/CONNECTOR_NAME/$FILENAME/g" index.html;
popd;
git add -f .;
git commit -m "Latest connector distribution on successful travis build $TRAVIS_BUILD_NUMBER auto-pushed to gh-pages";
git remote add origin-pages https://${GH_TOKEN}@github.com/$TRAVIS_REPO_SLUG.git > /dev/null 2>&1;
git push --quiet --set-upstream origin-pages gh-pages;
echo "Updated and pushed GH pages!";
fi
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=2.0.1
version=2.0.2
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@
import com.solacesystems.jcsmp.Topic;
import com.solacesystems.jcsmp.XMLMessageProducer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,19 +57,24 @@ public class SolaceSinkSender {
private SolRecordProcessorIF processor;
private String kafkaKey;
private AtomicInteger txMsgCounter = new AtomicInteger();

private SolaceSinkTask sinkTask;
private Map<TopicPartition, OffsetAndMetadata> offsets
= new HashMap<TopicPartition, OffsetAndMetadata>();

/**
* Class that sends Solace Messages from Kafka Records.
* @param sconfig JCSMP Configuration
* @param sessionHandler SolSessionHandler
* @param useTxforQueue
* @param useTxforQueue
* @param sinkTask Connector Sink Task
* @throws JCSMPException
*/
public SolaceSinkSender(SolaceSinkConnectorConfig sconfig, SolSessionHandler sessionHandler,
boolean useTxforQueue) throws JCSMPException {
boolean useTxforQueue, SolaceSinkTask sinkTask) throws JCSMPException {
this.sconfig = sconfig;
this.sessionHandler = sessionHandler;
this.useTxforQueue = useTxforQueue;
this.sinkTask = sinkTask;
kafkaKey = sconfig.getString(SolaceSinkConstants.SOL_KAFKA_MESSAGE_KEY);
topicProducer = sessionHandler.getSession().getMessageProducer(new SolStreamingMessageCallbackHandler());
cprocessor = (this.sconfig.getClass(SolaceSinkConstants.SOL_RECORD_PROCESSOR));
Expand Down Expand Up @@ -118,6 +127,8 @@ public void setupDestinationQueue() throws JCSMPException {
public void sendRecord(SinkRecord record) {
try {
message = processor.processRecord(kafkaKey, record);
offsets.put(new TopicPartition(record.topic(), record.kafkaPartition()),
new OffsetAndMetadata(record.kafkaOffset()));
log.trace("================ Processed record details, topic: {}, Partition: {}, "
+ "Offset: {}", record.topic(),
record.kafkaPartition(), record.kafkaOffset());
Expand Down Expand Up @@ -189,7 +200,7 @@ public void sendRecord(SinkRecord record) {
// Solace limits transaction size to 255 messages so need to force commit
if ( useTxforQueue && txMsgCounter.get() > sconfig.getInt(SolaceSinkConstants.SOL_QUEUE_MESSAGES_AUTOFLUSH_SIZE)-1 ) {
log.debug("================ Queue transaction autoflush size reached, flushing offsets from connector");
commit();
sinkTask.flush(offsets);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void start(Map<String, String> props) {
}

try {
solSender = new SolaceSinkSender(connectorConfig, solSessionHandler, useTxforQueue);
solSender = new SolaceSinkSender(connectorConfig, solSessionHandler, useTxforQueue, this);
if (connectorConfig.getString(SolaceSinkConstants.SOL_TOPICS) != null) {
solSender.setupDestinationTopics();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class VersionUtil {
*/
public static String getVersion() {

return "2.0.1";
return "2.0.2";

}
}

0 comments on commit 85e9d4e

Please sign in to comment.