Skip to content

Commit

Permalink
Moved from SourceTask#commit() to SourceTask#commitRecord() to elimin…
Browse files Browse the repository at this point in the history
…ate over acknowledgments and prevent message loss
  • Loading branch information
limitium authored and limitium committed Nov 7, 2024
1 parent 46b02db commit 4a0d367
Showing 1 changed file with 105 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,16 @@

package com.solace.connector.kafka.connect.source;

import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.DeliveryMode;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.*;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.*;
import java.util.concurrent.*;


public class SolaceSourceTask extends SourceTask { // implements XMLMessageListener{
Expand All @@ -45,8 +39,16 @@ public class SolaceSourceTask extends SourceTask { // implements XMLMessageListe
private SolSessionHandler solSessionHandler = null;
BlockingQueue<BytesXMLMessage> ingressMessages
= new LinkedBlockingQueue<>(); // LinkedBlockingQueue for any incoming message from PS+ topics and queue
BlockingQueue<BytesXMLMessage> outstandingAckList
= new LinkedBlockingQueue<>(); // LinkedBlockingQueue for Solace Flow messages
Map<BytesXMLMessage, Integer> pendingAcks = new HashMap<>(); // Pending acks for solace message with number of created record
Map<SourceRecord, BytesXMLMessage> recordToMessage = new HashMap<>(); // Map record to solace message

//Scheduled buffer for acks,
private final List<BytesXMLMessage> ackBuffer = new ArrayList<>(); // Buffer for acknowledgments
private int ackBufferSize; // Maximum buffer size before flush

private long lastMessageAddedTime = System.currentTimeMillis(); // Track time of last addition to the buffer
private final ScheduledExecutorService flushScheduler = Executors.newScheduledThreadPool(1);

String skafkaTopic;
SolaceSourceTopicListener topicListener = null;
SolaceSourceQueueConsumer queueConsumer = null;
Expand All @@ -56,6 +58,7 @@ public class SolaceSourceTask extends SourceTask { // implements XMLMessageListe

// private Class<?> cProcessor;
private SolMessageProcessorIF processor;
private int ackTimeout;

@Override
public String version() {
Expand Down Expand Up @@ -98,6 +101,26 @@ public void start(Map<String, String> props) {
throw new ConnectException("Failed to start queue consumer", e);
}
}

setupScheduler();
}

private void setupScheduler() {
//Align scheduler and buffer with underlying flowHandle.startAckTimer() and UnackedMessageList2.thresholdCount
int subWinSz = (Integer)solSessionHandler.getSession().getProperty(JCSMPProperties.SUB_ACK_WINDOW_SIZE);
int subThreshold = (Integer)solSessionHandler.getSession().getProperty(JCSMPProperties.SUB_ACK_WINDOW_THRESHOLD);
ackBufferSize = subWinSz * subThreshold / 100;

ackTimeout = (Integer)solSessionHandler.getSession().getProperty(JCSMPProperties.SUB_ACK_TIME);

// Start the scheduled task to periodically flush the buffer
flushScheduler.scheduleAtFixedRate(() -> {
try {
flushAckBufferIfNeeded();
} catch (Exception e) {
log.error("Error during scheduled ack buffer flush", e);
}
}, ackTimeout, ackTimeout, TimeUnit.MILLISECONDS);
}

@Override
Expand Down Expand Up @@ -132,38 +155,72 @@ public synchronized List<SourceRecord> poll() throws InterruptedException {
} catch (Exception e) {
if (connectorConfig.getBoolean(SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_IGNORE_ERROR)) {
log.warn("================ Encountered exception in message processing....discarded.", e);
scheduleForAck(msg);
msg.ackMessage(); // Effective discard solace message
discarded++;
continue;
} else {
throw new ConnectException("Encountered exception in message processing", e);
}
}
Collections.addAll(records, processor.getRecords(skafkaTopic));
scheduleForAck(msg);
SourceRecord[] processorRecords = processor.getRecords(skafkaTopic);
Collections.addAll(records, processorRecords);
scheduleForAck(msg, processorRecords);
}
log.debug("Processed {} records in this batch. Discarded {}", processedInThisBatch - discarded, discarded);
return records;
}

private synchronized void scheduleForAck(BytesXMLMessage msg) {
private synchronized void scheduleForAck(BytesXMLMessage msg, SourceRecord[] processorRecords) {
if (msg.getDeliveryMode() == DeliveryMode.NON_PERSISTENT
|| msg.getDeliveryMode() == DeliveryMode.PERSISTENT) {
outstandingAckList.add(msg); // enqueue messages received from guaranteed messaging endpoint for later ack
for (SourceRecord processorRecord : processorRecords) {
recordToMessage.put(processorRecord, msg); // Map each record to solace message id
}
pendingAcks.put(msg, processorRecords.length); // enqueue messages received from guaranteed messaging endpoint for later ack
}
}

/**
* Kafka Connect method that write records to disk.
*/
public synchronized void commit() throws InterruptedException {
log.trace("Committing records");
int currentLoad = outstandingAckList.size();
int count = 0;
while (count != currentLoad) {
outstandingAckList.take().ackMessage();
count++;
@Override
public synchronized void commitRecord(SourceRecord record, RecordMetadata metadata) {
BytesXMLMessage msg = recordToMessage.remove(record);
if (msg == null) {
log.error("Unable to find message for record {}", record); // Shouldn't happens
return;
}

if (!pendingAcks.containsKey(msg)) {
log.error("Unable to find message counter for message {}", msg); // Shouldn't happens
}

pendingAcks.computeIfPresent(msg, (k, o) -> o > 1 ? --o : null);// Reduce counter of records per message, remove on last

if (!pendingAcks.containsKey(msg)) {// Last record was commited in the group
ackBuffer.add(msg);
lastMessageAddedTime = System.currentTimeMillis(); // Update last message addition time

// Flush the buffer if it reaches the maximum buffer size
if (ackBuffer.size() >= ackBufferSize) {
flushAckBuffer();
}
log.debug("Buffer ack for message {}", msg);
}
}

private synchronized void flushAckBufferIfNeeded() {
long currentTime = System.currentTimeMillis();
if (!ackBuffer.isEmpty() && (currentTime - lastMessageAddedTime) >= ackTimeout) {
flushAckBuffer();
}
}

private synchronized void flushAckBuffer() {
for (BytesXMLMessage msg : ackBuffer) {
msg.ackMessage();
log.debug("Acknowledged message {}", msg);
}

ackBuffer.clear(); // Clear the buffer after acknowledgment
log.debug("Flushed acknowledgment buffer");
}

@Override
Expand All @@ -183,6 +240,27 @@ public synchronized void stop() {
}
solSessionHandler = null; // At this point filling the ingress queue is stopped
ingressMessages.clear(); // Remove all remaining ingressed messages, these will be no longer imported to Kafka
recordToMessage.clear();

if (!pendingAcks.isEmpty()) {
log.warn("Potential duplicates might be spotted");
pendingAcks.forEach((s, m) -> log.warn("Dup: {}", m));
pendingAcks.clear();
}

// Flush remaining messages in the buffer
flushAckBuffer();
if (!flushScheduler.isShutdown()) {
flushScheduler.shutdown();
try {
if (!flushScheduler.awaitTermination(500, TimeUnit.MILLISECONDS)) {
flushScheduler.shutdownNow();
}
} catch (InterruptedException e) {
flushScheduler.shutdownNow();
}
}

log.info("PubSub+ Source Connector stopped");
}

Expand Down

0 comments on commit 4a0d367

Please sign in to comment.