Skip to content

Commit

Permalink
Merge pull request #2241 from malakaganga/fix_mp_channel
Browse files Browse the repository at this point in the history
Fix infinite dispatch loop in MP due to stale channel
  • Loading branch information
malakaganga authored Nov 28, 2024
2 parents 6e831e3 + b12790a commit c56e91b
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,10 @@ public interface MessageConsumer {
* @return ID
*/
public String getId();

/**
* Re-initializes the message consumer.
* @return {@code true} if re-initialization is successful, {@code false} otherwise.
*/
public boolean reInitialize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ private void tryToDispatchToEndpoint(MessageContext messageToDispatch, Endpoint
// For each retry we need to have a fresh copy of the original message
getFreshCopyOfOriginalMessage(messageToDispatch, originalEnvelop, originalJsonInputStream);

if (messageConsumer != null && messageConsumer.isAlive()) {
if (messageConsumer != null && (messageConsumer.isAlive() || messageConsumer.reInitialize())) {
messageToDispatch.setProperty(SynapseConstants.BLOCKING_MSG_SENDER, sender);
// Clear the message context properties related to endpoint in last service invocation
Set keySet = messageToDispatch.getPropertyKeySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public MessageContext receive() {
}
}

public boolean reInitialize() {
// To keep the existing behaviour, return false
return false;
}

/**
* Ack on success message sending by processor
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ private void reconnect() throws StoreForwardException {
}
}

public boolean reInitialize() {
// To keep the existing behaviour, return false
return false;
}

private final class CachedMessage {
private Message message = null;
private MessageContext mc = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,10 @@ public InMemoryConsumer setDestination(Queue<MessageContext> queue) {
this.queue = queue;
return this;
}

@Override
public boolean reInitialize() {
// To keep the existing behaviour, return false
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,26 @@ public MessageContext receive() {
}
} else {
log.warn("The connection and channel to the RabbitMQ broker are unhealthy.");
reInitialize();
}
return null;
}

/**
* Reconnect to the RabbitMQ broker by creating a new connection and channel.
* return {@code true} if re-initialization is successful, {@code false} otherwise.
*/
public boolean reInitialize() {
try {
log.info("Cleanup and Reinitializing connection and channel for " + getId());
cleanup();
setConnection(store.createConnection());
setChannel(store.createChannel(connection));
} catch (Exception e) {
log.error("Failed to reinitialize connection and channel.", e);
return false;
}
return null;
return true;
}

/**
Expand Down Expand Up @@ -136,14 +151,30 @@ public boolean ack() {
*/
@Override
public boolean cleanup() {
if (connection != null) {
connection.abort();
try {
if (channel != null) {
channel.abort(); // Forcefully close the channel
}
} catch (Exception e) {
//ignore
} finally {
channel = null; // Ensure channel is nullified
}
channel = null;
connection = null;
return true;

try {
if (connection != null) {
connection.abort(); // Forcefully close the connection
}
} catch (Exception e) {
//ignore
} finally {
connection = null; // Ensure connection is nullified
}

return true; // Indicating cleanup was attempted
}


/**
* Check availability of connectivity with the message store
*
Expand Down

0 comments on commit c56e91b

Please sign in to comment.