diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaDestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaDestinationFilter.java index 795cad4dbbd..aef8f2c1211 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaDestinationFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaDestinationFilter.java @@ -25,7 +25,6 @@ import org.apache.activemq.command.TransactionId; public class ReplicaDestinationFilter extends DestinationFilter { - private final boolean nextIsComposite; private final ReplicaSourceBroker sourceBroker; private final ReplicaRoleManagementBroker roleManagementBroker; @@ -55,6 +54,14 @@ public void send(ProducerBrokerExchange producerExchange, Message messageSend) t } } + @Override + public boolean canGC() { + if (ReplicaRole.source == roleManagementBroker.getRole()) { + return super.canGC(); + } + return false; + } + private void replicateSend(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { final ConnectionContext connectionContext = producerExchange.getConnectionContext(); if (!sourceBroker.needToReplicateSend(connectionContext, messageSend)) { diff --git a/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaBrokerEventListenerTest.java b/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaBrokerEventListenerTest.java index 024639d6a98..6dbe38a5f02 100644 --- a/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaBrokerEventListenerTest.java +++ b/activemq-broker/src/test/java/org/apache/activemq/replica/ReplicaBrokerEventListenerTest.java @@ -40,6 +40,7 @@ import org.apache.activemq.command.XATransactionId; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; + import org.apache.activemq.util.IOHelper; import org.junit.Before; import org.junit.Test; @@ -940,6 +941,12 @@ public void canHandleEventOfType_MESSAGE_SEND_newerVersion() throws Exception { verify(replicaEventMessage, never()).acknowledge(); } + private void verifyConnectionContext() { + verify(connectionContext, times(2)).isProducerFlowControl(); + verify(connectionContext, times(2)).setProducerFlowControl(true); + verify(connectionContext, times(2)).setProducerFlowControl(false); + } + @Test public void canHandleEventOfType_FAIL_OVER() throws Exception { @@ -1000,10 +1007,4 @@ public byte[] getBranchQualifier() { } }; } - - private void verifyConnectionContext() { - verify(connectionContext, times(2)).isProducerFlowControl(); - verify(connectionContext, times(2)).setProducerFlowControl(true); - verify(connectionContext, times(2)).setProducerFlowControl(false); - } }