Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved from SourceTask#commit() to SourceTask#commitRecord() #76

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,16 @@

package com.solace.connector.kafka.connect.source;

import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.DeliveryMode;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.*;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.*;
import java.util.concurrent.*;


public class SolaceSourceTask extends SourceTask { // implements XMLMessageListener{
Expand All @@ -45,8 +39,16 @@ public class SolaceSourceTask extends SourceTask { // implements XMLMessageListe
private SolSessionHandler solSessionHandler = null;
BlockingQueue<BytesXMLMessage> ingressMessages
= new LinkedBlockingQueue<>(); // LinkedBlockingQueue for any incoming message from PS+ topics and queue
BlockingQueue<BytesXMLMessage> outstandingAckList
= new LinkedBlockingQueue<>(); // LinkedBlockingQueue for Solace Flow messages
Map<BytesXMLMessage, Integer> pendingAcks = new HashMap<>(); // Pending acks for solace message with number of created record
Map<SourceRecord, BytesXMLMessage> recordToMessage = new HashMap<>(); // Map record to solace message

//Scheduled buffer for acks,
private final List<BytesXMLMessage> ackBuffer = new ArrayList<>(); // Buffer for acknowledgments
private int ackBufferSize; // Maximum buffer size before flush

private long lastMessageAddedTime = System.currentTimeMillis(); // Track time of last addition to the buffer
private final ScheduledExecutorService flushScheduler = Executors.newScheduledThreadPool(1);

String skafkaTopic;
SolaceSourceTopicListener topicListener = null;
SolaceSourceQueueConsumer queueConsumer = null;
Expand All @@ -56,6 +58,7 @@ public class SolaceSourceTask extends SourceTask { // implements XMLMessageListe

// private Class<?> cProcessor;
private SolMessageProcessorIF processor;
private int ackTimeout;

@Override
public String version() {
Expand Down Expand Up @@ -98,6 +101,26 @@ public void start(Map<String, String> props) {
throw new ConnectException("Failed to start queue consumer", e);
}
}

setupScheduler();
}

private void setupScheduler() {
//Align scheduler and buffer with underlying flowHandle.startAckTimer() and UnackedMessageList2.thresholdCount
int subWinSz = (Integer)solSessionHandler.getSession().getProperty(JCSMPProperties.SUB_ACK_WINDOW_SIZE);
int subThreshold = (Integer)solSessionHandler.getSession().getProperty(JCSMPProperties.SUB_ACK_WINDOW_THRESHOLD);
ackBufferSize = subWinSz * subThreshold / 100;

ackTimeout = (Integer)solSessionHandler.getSession().getProperty(JCSMPProperties.SUB_ACK_TIME);

// Start the scheduled task to periodically flush the buffer
flushScheduler.scheduleAtFixedRate(() -> {
try {
flushAckBufferIfNeeded();
} catch (Exception e) {
log.error("Error during scheduled ack buffer flush", e);
}
}, ackTimeout, ackTimeout, TimeUnit.MILLISECONDS);
}

@Override
Expand Down Expand Up @@ -132,38 +155,72 @@ public synchronized List<SourceRecord> poll() throws InterruptedException {
} catch (Exception e) {
if (connectorConfig.getBoolean(SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_IGNORE_ERROR)) {
log.warn("================ Encountered exception in message processing....discarded.", e);
scheduleForAck(msg);
msg.ackMessage(); // Effective discard solace message
discarded++;
continue;
} else {
throw new ConnectException("Encountered exception in message processing", e);
}
}
Collections.addAll(records, processor.getRecords(skafkaTopic));
scheduleForAck(msg);
SourceRecord[] processorRecords = processor.getRecords(skafkaTopic);
Collections.addAll(records, processorRecords);
scheduleForAck(msg, processorRecords);
}
log.debug("Processed {} records in this batch. Discarded {}", processedInThisBatch - discarded, discarded);
return records;
}

private synchronized void scheduleForAck(BytesXMLMessage msg) {
private synchronized void scheduleForAck(BytesXMLMessage msg, SourceRecord[] processorRecords) {
if (msg.getDeliveryMode() == DeliveryMode.NON_PERSISTENT
|| msg.getDeliveryMode() == DeliveryMode.PERSISTENT) {
outstandingAckList.add(msg); // enqueue messages received from guaranteed messaging endpoint for later ack
for (SourceRecord processorRecord : processorRecords) {
recordToMessage.put(processorRecord, msg); // Map each record to solace message id
}
pendingAcks.put(msg, processorRecords.length); // enqueue messages received from guaranteed messaging endpoint for later ack
}
}

/**
* Kafka Connect method that write records to disk.
*/
public synchronized void commit() throws InterruptedException {
log.trace("Committing records");
int currentLoad = outstandingAckList.size();
int count = 0;
while (count != currentLoad) {
outstandingAckList.take().ackMessage();
count++;
@Override
public synchronized void commitRecord(SourceRecord record, RecordMetadata metadata) {
BytesXMLMessage msg = recordToMessage.remove(record);
if (msg == null) {
log.error("Unable to find message for record {}", record); // Shouldn't happens
return;
}

if (!pendingAcks.containsKey(msg)) {
log.error("Unable to find message counter for message {}", msg); // Shouldn't happens
}

pendingAcks.computeIfPresent(msg, (k, o) -> o > 1 ? --o : null);// Reduce counter of records per message, remove on last

if (!pendingAcks.containsKey(msg)) {// Last record was commited in the group
ackBuffer.add(msg);
lastMessageAddedTime = System.currentTimeMillis(); // Update last message addition time

// Flush the buffer if it reaches the maximum buffer size
if (ackBuffer.size() >= ackBufferSize) {
flushAckBuffer();
}
log.debug("Buffer ack for message {}", msg);
}
}

private synchronized void flushAckBufferIfNeeded() {
long currentTime = System.currentTimeMillis();
if (!ackBuffer.isEmpty() && (currentTime - lastMessageAddedTime) >= ackTimeout) {
flushAckBuffer();
}
}

private synchronized void flushAckBuffer() {
for (BytesXMLMessage msg : ackBuffer) {
msg.ackMessage();
log.debug("Acknowledged message {}", msg);
}

ackBuffer.clear(); // Clear the buffer after acknowledgment
log.debug("Flushed acknowledgment buffer");
}

@Override
Expand All @@ -183,6 +240,27 @@ public synchronized void stop() {
}
solSessionHandler = null; // At this point filling the ingress queue is stopped
ingressMessages.clear(); // Remove all remaining ingressed messages, these will be no longer imported to Kafka
recordToMessage.clear();

if (!pendingAcks.isEmpty()) {
log.warn("Potential duplicates might be spotted");
pendingAcks.forEach((s, m) -> log.warn("Dup: {}", m));
pendingAcks.clear();
}

// Flush remaining messages in the buffer
flushAckBuffer();
if (!flushScheduler.isShutdown()) {
flushScheduler.shutdown();
try {
if (!flushScheduler.awaitTermination(500, TimeUnit.MILLISECONDS)) {
flushScheduler.shutdownNow();
}
} catch (InterruptedException e) {
flushScheduler.shutdownNow();
}
}

log.info("PubSub+ Source Connector stopped");
}

Expand Down