Skip to content

Commit

Permalink
Fix infinite dispatch loop in MP due to stale channel
Browse files Browse the repository at this point in the history
The issue occurred when the message processor repeatedly attempted to dispatch a message in a loop, leaving an unacknowledged message in the RabbitMQ queue. Over time, the channel to the queue became stale. Although there is a check in the dispatch logic for a healthy store connection, this check failed to terminate the loop, causing the dispatch process to run indefinitely. This commit addresses the issue by ensuring the loop exits when the channel becomes stale or the store connection is deemed unhealthy.

Fixes: wso2/product-micro-integrator#3771
  • Loading branch information
malakaganga committed Nov 28, 2024
1 parent 6e831e3 commit b12790a
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,10 @@ public interface MessageConsumer {
* @return ID
*/
public String getId();

/**
* Re-initializes the message consumer.
* @return {@code true} if re-initialization is successful, {@code false} otherwise.
*/
public boolean reInitialize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ private void tryToDispatchToEndpoint(MessageContext messageToDispatch, Endpoint
// For each retry we need to have a fresh copy of the original message
getFreshCopyOfOriginalMessage(messageToDispatch, originalEnvelop, originalJsonInputStream);

if (messageConsumer != null && messageConsumer.isAlive()) {
if (messageConsumer != null && (messageConsumer.isAlive() || messageConsumer.reInitialize())) {
messageToDispatch.setProperty(SynapseConstants.BLOCKING_MSG_SENDER, sender);
// Clear the message context properties related to endpoint in last service invocation
Set keySet = messageToDispatch.getPropertyKeySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public MessageContext receive() {
}
}

public boolean reInitialize() {
// To keep the existing behaviour, return false
return false;
}

/**
* Ack on success message sending by processor
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ private void reconnect() throws StoreForwardException {
}
}

public boolean reInitialize() {
// To keep the existing behaviour, return false
return false;
}

private final class CachedMessage {
private Message message = null;
private MessageContext mc = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,10 @@ public InMemoryConsumer setDestination(Queue<MessageContext> queue) {
this.queue = queue;
return this;
}

@Override
public boolean reInitialize() {
// To keep the existing behaviour, return false
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,26 @@ public MessageContext receive() {
}
} else {
log.warn("The connection and channel to the RabbitMQ broker are unhealthy.");
reInitialize();
}
return null;
}

/**
* Reconnect to the RabbitMQ broker by creating a new connection and channel.
* return {@code true} if re-initialization is successful, {@code false} otherwise.
*/
public boolean reInitialize() {
try {
log.info("Cleanup and Reinitializing connection and channel for " + getId());
cleanup();
setConnection(store.createConnection());
setChannel(store.createChannel(connection));
} catch (Exception e) {
log.error("Failed to reinitialize connection and channel.", e);
return false;
}
return null;
return true;
}

/**
Expand Down Expand Up @@ -136,14 +151,30 @@ public boolean ack() {
*/
@Override
public boolean cleanup() {
if (connection != null) {
connection.abort();
try {
if (channel != null) {
channel.abort(); // Forcefully close the channel
}
} catch (Exception e) {
//ignore
} finally {
channel = null; // Ensure channel is nullified
}
channel = null;
connection = null;
return true;

try {
if (connection != null) {
connection.abort(); // Forcefully close the connection
}
} catch (Exception e) {
//ignore
} finally {
connection = null; // Ensure connection is nullified
}

return true; // Indicating cleanup was attempted
}


/**
* Check availability of connectivity with the message store
*
Expand Down

0 comments on commit b12790a

Please sign in to comment.