diff --git a/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceTask.java b/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceTask.java index 13b2432..27ac02c 100644 --- a/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceTask.java +++ b/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceTask.java @@ -19,22 +19,16 @@ package com.solace.connector.kafka.connect.source; -import com.solacesystems.jcsmp.BytesXMLMessage; -import com.solacesystems.jcsmp.DeliveryMode; -import com.solacesystems.jcsmp.JCSMPException; -import com.solacesystems.jcsmp.JCSMPSession; +import com.solacesystems.jcsmp.*; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.*; +import java.util.concurrent.*; public class SolaceSourceTask extends SourceTask { // implements XMLMessageListener{ @@ -45,8 +39,16 @@ public class SolaceSourceTask extends SourceTask { // implements XMLMessageListe private SolSessionHandler solSessionHandler = null; BlockingQueue ingressMessages = new LinkedBlockingQueue<>(); // LinkedBlockingQueue for any incoming message from PS+ topics and queue - BlockingQueue outstandingAckList - = new LinkedBlockingQueue<>(); // LinkedBlockingQueue for Solace Flow messages + Map pendingAcks = new HashMap<>(); // Pending acks for solace message with number of created record + Map recordToMessage = new HashMap<>(); // Map record to solace message + + //Scheduled buffer for acks, + private final List ackBuffer = new ArrayList<>(); // Buffer for acknowledgments + private int ackBufferSize; // Maximum buffer size before flush + + private long lastMessageAddedTime = System.currentTimeMillis(); // Track time of last addition to the buffer + private final ScheduledExecutorService flushScheduler = Executors.newScheduledThreadPool(1); + String skafkaTopic; SolaceSourceTopicListener topicListener = null; SolaceSourceQueueConsumer queueConsumer = null; @@ -56,6 +58,7 @@ public class SolaceSourceTask extends SourceTask { // implements XMLMessageListe // private Class cProcessor; private SolMessageProcessorIF processor; + private int ackTimeout; @Override public String version() { @@ -98,6 +101,26 @@ public void start(Map props) { throw new ConnectException("Failed to start queue consumer", e); } } + + setupScheduler(); + } + + private void setupScheduler() { + //Align scheduler and buffer with underlying flowHandle.startAckTimer() and UnackedMessageList2.thresholdCount + int subWinSz = (Integer)solSessionHandler.getSession().getProperty(JCSMPProperties.SUB_ACK_WINDOW_SIZE); + int subThreshold = (Integer)solSessionHandler.getSession().getProperty(JCSMPProperties.SUB_ACK_WINDOW_THRESHOLD); + ackBufferSize = subWinSz * subThreshold / 100; + + ackTimeout = (Integer)solSessionHandler.getSession().getProperty(JCSMPProperties.SUB_ACK_TIME); + + // Start the scheduled task to periodically flush the buffer + flushScheduler.scheduleAtFixedRate(() -> { + try { + flushAckBufferIfNeeded(); + } catch (Exception e) { + log.error("Error during scheduled ack buffer flush", e); + } + }, ackTimeout, ackTimeout, TimeUnit.MILLISECONDS); } @Override @@ -132,38 +155,72 @@ public synchronized List poll() throws InterruptedException { } catch (Exception e) { if (connectorConfig.getBoolean(SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_IGNORE_ERROR)) { log.warn("================ Encountered exception in message processing....discarded.", e); - scheduleForAck(msg); + msg.ackMessage(); // Effective discard solace message discarded++; continue; } else { throw new ConnectException("Encountered exception in message processing", e); } } - Collections.addAll(records, processor.getRecords(skafkaTopic)); - scheduleForAck(msg); + SourceRecord[] processorRecords = processor.getRecords(skafkaTopic); + Collections.addAll(records, processorRecords); + scheduleForAck(msg, processorRecords); } log.debug("Processed {} records in this batch. Discarded {}", processedInThisBatch - discarded, discarded); return records; } - private synchronized void scheduleForAck(BytesXMLMessage msg) { + private synchronized void scheduleForAck(BytesXMLMessage msg, SourceRecord[] processorRecords) { if (msg.getDeliveryMode() == DeliveryMode.NON_PERSISTENT || msg.getDeliveryMode() == DeliveryMode.PERSISTENT) { - outstandingAckList.add(msg); // enqueue messages received from guaranteed messaging endpoint for later ack + for (SourceRecord processorRecord : processorRecords) { + recordToMessage.put(processorRecord, msg); // Map each record to solace message id + } + pendingAcks.put(msg, processorRecords.length); // enqueue messages received from guaranteed messaging endpoint for later ack } } - /** - * Kafka Connect method that write records to disk. - */ - public synchronized void commit() throws InterruptedException { - log.trace("Committing records"); - int currentLoad = outstandingAckList.size(); - int count = 0; - while (count != currentLoad) { - outstandingAckList.take().ackMessage(); - count++; + @Override + public synchronized void commitRecord(SourceRecord record, RecordMetadata metadata) { + BytesXMLMessage msg = recordToMessage.remove(record); + if (msg == null) { + log.error("Unable to find message for record {}", record); // Shouldn't happens + return; } + + if (!pendingAcks.containsKey(msg)) { + log.error("Unable to find message counter for message {}", msg); // Shouldn't happens + } + + pendingAcks.computeIfPresent(msg, (k, o) -> o > 1 ? --o : null);// Reduce counter of records per message, remove on last + + if (!pendingAcks.containsKey(msg)) {// Last record was commited in the group + ackBuffer.add(msg); + lastMessageAddedTime = System.currentTimeMillis(); // Update last message addition time + + // Flush the buffer if it reaches the maximum buffer size + if (ackBuffer.size() >= ackBufferSize) { + flushAckBuffer(); + } + log.debug("Buffer ack for message {}", msg); + } + } + + private synchronized void flushAckBufferIfNeeded() { + long currentTime = System.currentTimeMillis(); + if (!ackBuffer.isEmpty() && (currentTime - lastMessageAddedTime) >= ackTimeout) { + flushAckBuffer(); + } + } + + private synchronized void flushAckBuffer() { + for (BytesXMLMessage msg : ackBuffer) { + msg.ackMessage(); + log.debug("Acknowledged message {}", msg); + } + + ackBuffer.clear(); // Clear the buffer after acknowledgment + log.debug("Flushed acknowledgment buffer"); } @Override @@ -183,6 +240,27 @@ public synchronized void stop() { } solSessionHandler = null; // At this point filling the ingress queue is stopped ingressMessages.clear(); // Remove all remaining ingressed messages, these will be no longer imported to Kafka + recordToMessage.clear(); + + if (!pendingAcks.isEmpty()) { + log.warn("Potential duplicates might be spotted"); + pendingAcks.forEach((s, m) -> log.warn("Dup: {}", m)); + pendingAcks.clear(); + } + + // Flush remaining messages in the buffer + flushAckBuffer(); + if (!flushScheduler.isShutdown()) { + flushScheduler.shutdown(); + try { + if (!flushScheduler.awaitTermination(500, TimeUnit.MILLISECONDS)) { + flushScheduler.shutdownNow(); + } + } catch (InterruptedException e) { + flushScheduler.shutdownNow(); + } + } + log.info("PubSub+ Source Connector stopped"); }