From b12790a5ac95981543feaca0d6f0328e25e6de39 Mon Sep 17 00:00:00 2001 From: malakaganga Date: Thu, 28 Nov 2024 13:05:49 +0530 Subject: [PATCH] Fix infinite dispatch loop in MP due to stale channel The issue occurred when the message processor repeatedly attempted to dispatch a message in a loop, leaving an unacknowledged message in the RabbitMQ queue. Over time, the channel to the queue became stale. Although there is a check in the dispatch logic for a healthy store connection, this check failed to terminate the loop, causing the dispatch process to run indefinitely. This commit addresses the issue by ensuring the loop exits when the channel becomes stale or the store connection is deemed unhealthy. Fixes: https://github.com/wso2/product-micro-integrator/issues/3771 --- .../synapse/message/MessageConsumer.java | 6 +++ .../impl/forwarder/ForwardingService.java | 2 +- .../message/store/impl/jdbc/JDBCConsumer.java | 5 +++ .../message/store/impl/jms/JmsConsumer.java | 5 +++ .../store/impl/memory/InMemoryConsumer.java | 6 +++ .../store/impl/rabbitmq/RabbitMQConsumer.java | 43 ++++++++++++++++--- 6 files changed, 60 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/synapse/message/MessageConsumer.java b/modules/core/src/main/java/org/apache/synapse/message/MessageConsumer.java index 18da251a14..ad3cb8376f 100644 --- a/modules/core/src/main/java/org/apache/synapse/message/MessageConsumer.java +++ b/modules/core/src/main/java/org/apache/synapse/message/MessageConsumer.java @@ -62,4 +62,10 @@ public interface MessageConsumer { * @return ID */ public String getId(); + + /** + * Re-initializes the message consumer. + * @return {@code true} if re-initialization is successful, {@code false} otherwise. + */ + public boolean reInitialize(); } diff --git a/modules/core/src/main/java/org/apache/synapse/message/processor/impl/forwarder/ForwardingService.java b/modules/core/src/main/java/org/apache/synapse/message/processor/impl/forwarder/ForwardingService.java index 7db3c822a3..294b269c64 100644 --- a/modules/core/src/main/java/org/apache/synapse/message/processor/impl/forwarder/ForwardingService.java +++ b/modules/core/src/main/java/org/apache/synapse/message/processor/impl/forwarder/ForwardingService.java @@ -681,7 +681,7 @@ private void tryToDispatchToEndpoint(MessageContext messageToDispatch, Endpoint // For each retry we need to have a fresh copy of the original message getFreshCopyOfOriginalMessage(messageToDispatch, originalEnvelop, originalJsonInputStream); - if (messageConsumer != null && messageConsumer.isAlive()) { + if (messageConsumer != null && (messageConsumer.isAlive() || messageConsumer.reInitialize())) { messageToDispatch.setProperty(SynapseConstants.BLOCKING_MSG_SENDER, sender); // Clear the message context properties related to endpoint in last service invocation Set keySet = messageToDispatch.getPropertyKeySet(); diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCConsumer.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCConsumer.java index e40db909cf..1d0dc9e5c9 100644 --- a/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCConsumer.java +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCConsumer.java @@ -90,6 +90,11 @@ public MessageContext receive() { } } + public boolean reInitialize() { + // To keep the existing behaviour, return false + return false; + } + /** * Ack on success message sending by processor * diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/jms/JmsConsumer.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jms/JmsConsumer.java index 2bc40b4b31..7aa5d90893 100644 --- a/modules/core/src/main/java/org/apache/synapse/message/store/impl/jms/JmsConsumer.java +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jms/JmsConsumer.java @@ -283,6 +283,11 @@ private void reconnect() throws StoreForwardException { } } + public boolean reInitialize() { + // To keep the existing behaviour, return false + return false; + } + private final class CachedMessage { private Message message = null; private MessageContext mc = null; diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/memory/InMemoryConsumer.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/memory/InMemoryConsumer.java index f605314902..0c583e7d7e 100644 --- a/modules/core/src/main/java/org/apache/synapse/message/store/impl/memory/InMemoryConsumer.java +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/memory/InMemoryConsumer.java @@ -98,4 +98,10 @@ public InMemoryConsumer setDestination(Queue queue) { this.queue = queue; return this; } + + @Override + public boolean reInitialize() { + // To keep the existing behaviour, return false + return false; + } } diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/rabbitmq/RabbitMQConsumer.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/rabbitmq/RabbitMQConsumer.java index b6fd6ac720..355414aaac 100644 --- a/modules/core/src/main/java/org/apache/synapse/message/store/impl/rabbitmq/RabbitMQConsumer.java +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/rabbitmq/RabbitMQConsumer.java @@ -92,11 +92,26 @@ public MessageContext receive() { } } else { log.warn("The connection and channel to the RabbitMQ broker are unhealthy."); + reInitialize(); + } + return null; + } + + /** + * Reconnect to the RabbitMQ broker by creating a new connection and channel. + * return {@code true} if re-initialization is successful, {@code false} otherwise. + */ + public boolean reInitialize() { + try { + log.info("Cleanup and Reinitializing connection and channel for " + getId()); cleanup(); setConnection(store.createConnection()); setChannel(store.createChannel(connection)); + } catch (Exception e) { + log.error("Failed to reinitialize connection and channel.", e); + return false; } - return null; + return true; } /** @@ -136,14 +151,30 @@ public boolean ack() { */ @Override public boolean cleanup() { - if (connection != null) { - connection.abort(); + try { + if (channel != null) { + channel.abort(); // Forcefully close the channel + } + } catch (Exception e) { + //ignore + } finally { + channel = null; // Ensure channel is nullified } - channel = null; - connection = null; - return true; + + try { + if (connection != null) { + connection.abort(); // Forcefully close the connection + } + } catch (Exception e) { + //ignore + } finally { + connection = null; // Ensure connection is nullified + } + + return true; // Indicating cleanup was attempted } + /** * Check availability of connectivity with the message store *