diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaMirroredDestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaMirroredDestinationFilter.java
new file mode 100644
index 00000000000..3ddde277cc8
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaMirroredDestinationFilter.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFilter;
+import org.apache.activemq.command.Message;
+
+public class ReplicaMirroredDestinationFilter extends DestinationFilter {
+
+ private final ReplicaRoleManagementBroker roleManagementBroker;
+
+ public ReplicaMirroredDestinationFilter(Destination next, ReplicaRoleManagementBroker roleManagementBroker) {
+ super(next);
+ this.roleManagementBroker = roleManagementBroker;
+ }
+
+ @Override
+ public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
+ if (ReplicaSupport.isReplicationDestination(messageSend.getDestination()) ||
+ (roleManagementBroker.getRole() != ReplicaRole.source && getNext() instanceof DestinationFilter)) {
+ ((DestinationFilter) getNext()).getNext().send(producerExchange, messageSend);
+ } else {
+ super.send(producerExchange, messageSend);
+ }
+ }
+}
\ No newline at end of file
diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaMirroredDestinationInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaMirroredDestinationInterceptor.java
new file mode 100644
index 00000000000..8255b5f087b
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaMirroredDestinationInterceptor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.replica;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.command.ActiveMQDestination;
+
+public class ReplicaMirroredDestinationInterceptor implements DestinationInterceptor {
+
+ private final ReplicaRoleManagementBroker roleManagementBroker;
+
+ public ReplicaMirroredDestinationInterceptor(ReplicaRoleManagementBroker roleManagementBroker) {
+ this.roleManagementBroker = roleManagementBroker;
+ }
+
+ @Override
+ public Destination intercept(Destination destination) {
+ return new ReplicaMirroredDestinationFilter(destination, roleManagementBroker);
+ }
+
+ @Override
+ public void remove(Destination destination) {
+ }
+
+ @Override
+ public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
+ }
+}
\ No newline at end of file
diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaRoleManagementBroker.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaRoleManagementBroker.java
index 6173e83ac8e..35669513b62 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaRoleManagementBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaRoleManagementBroker.java
@@ -25,6 +25,7 @@
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.virtual.MirroredQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageId;
@@ -85,6 +86,7 @@ public ReplicaRoleManagementBroker(Broker broker, ReplicaPolicy replicaPolicy, R
replicaBroker = buildReplicaBroker(queueProvider);
addInterceptor4CompositeQueues();
+ addInterceptor4MirroredQueues();
}
@Override
@@ -203,6 +205,27 @@ private void addInterceptor4CompositeQueues() {
interceptors[interceptors.length - 1] = new ReplicaDestinationInterceptor(sourceBroker, this);
compositeInterceptor.setInterceptors(interceptors);
}
+
+ private void addInterceptor4MirroredQueues() {
+ final RegionBroker regionBroker = (RegionBroker) broker.getAdaptor(RegionBroker.class);
+ final CompositeDestinationInterceptor compositeInterceptor = (CompositeDestinationInterceptor) regionBroker.getDestinationInterceptor();
+ DestinationInterceptor[] interceptors = compositeInterceptor.getInterceptors();
+ int index = -1;
+ for (int i = 0; i < interceptors.length; i++) {
+ if (interceptors[i] instanceof MirroredQueue) {
+ index = i;
+ break;
+ }
+ }
+ if (index < 0) {
+ return;
+ }
+ DestinationInterceptor[] newInterceptors = new DestinationInterceptor[interceptors.length + 1];
+ System.arraycopy(interceptors, 0, newInterceptors, 0, index + 1);
+ System.arraycopy(interceptors, index + 1, newInterceptors, index + 2, interceptors.length - index - 1);
+ newInterceptors[index + 1] = new ReplicaMirroredDestinationInterceptor(this);
+ compositeInterceptor.setInterceptors(newInterceptors);
+ }
private MutativeRoleBroker getNextByRole() {
switch (role) {
diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSupport.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSupport.java
index 587d99053c7..a109206e1f4 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSupport.java
@@ -65,11 +65,9 @@ private ReplicaSupport() {
public static final Set DELETABLE_REPLICATION_DESTINATION_NAMES = Set.of(MAIN_REPLICATION_QUEUE_NAME,
INTERMEDIATE_REPLICATION_QUEUE_NAME, SEQUENCE_REPLICATION_QUEUE_NAME);
-
public static final Set REPLICATION_QUEUE_NAMES = Set.of(MAIN_REPLICATION_QUEUE_NAME,
INTERMEDIATE_REPLICATION_QUEUE_NAME, SEQUENCE_REPLICATION_QUEUE_NAME, REPLICATION_ROLE_QUEUE_NAME);
- public static final Set REPLICATION_TOPIC_NAMES = Set.of(MAIN_REPLICATION_QUEUE_NAME,
- INTERMEDIATE_REPLICATION_QUEUE_NAME, SEQUENCE_REPLICATION_QUEUE_NAME, REPLICATION_ROLE_QUEUE_NAME);
+ public static final Set REPLICATION_TOPIC_NAMES = Set.of(REPLICATION_ROLE_ADVISORY_TOPIC_NAME);
public static final Set REPLICATION_DESTINATION_NAMES = Stream.concat(REPLICATION_QUEUE_NAMES.stream(),
REPLICATION_TOPIC_NAMES.stream()).collect(Collectors.toSet());
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/replica/ReplicaPluginMirrorQueueTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/replica/ReplicaPluginMirrorQueueTest.java
new file mode 100644
index 00000000000..721504addb7
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/replica/ReplicaPluginMirrorQueueTest.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.replica;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQTextMessage;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+public class ReplicaPluginMirrorQueueTest extends ReplicaPluginTestSupport {
+
+ protected Connection firstBrokerConnection;
+ protected Connection secondBrokerConnection;
+
+ @Override
+ protected void setUp() throws Exception {
+
+ }
+ @Override
+ protected void tearDown() throws Exception {
+ if (firstBrokerConnection != null) {
+ firstBrokerConnection.close();
+ firstBrokerConnection = null;
+ }
+ if (secondBrokerConnection != null) {
+ secondBrokerConnection.close();
+ secondBrokerConnection = null;
+ }
+
+ super.tearDown();
+ }
+
+ public void testSendMessageWhenPrimaryIsMirrored() throws Exception {
+ firstBroker = createFirstBroker();
+ firstBroker.setUseMirroredQueues(true);
+ secondBroker = createSecondBroker();
+ startFirstBroker();
+ startSecondBroker();
+
+ firstBrokerConnectionFactory = new ActiveMQConnectionFactory(firstBindAddress);
+ secondBrokerConnectionFactory = new ActiveMQConnectionFactory(secondBindAddress);
+ destination = createDestination();
+ firstBrokerConnection = firstBrokerConnectionFactory.createConnection();
+ firstBrokerConnection.start();
+ secondBrokerConnection = secondBrokerConnectionFactory.createConnection();
+ secondBrokerConnection.start();
+
+ waitUntilReplicationQueueHasConsumer(firstBroker);
+
+ Session firstBrokerSession = firstBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer firstBrokerProducer = firstBrokerSession.createProducer(destination);
+ MessageConsumer firstBrokerConsumer = firstBrokerSession.createConsumer(destination);
+
+ Session secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer secondBrokerConsumer = secondBrokerSession.createConsumer(destination);
+
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setText(getName());
+ firstBrokerProducer.send(message);
+
+ Message receivedMessage = secondBrokerConsumer.receive(LONG_TIMEOUT);
+ assertNull(receivedMessage);
+
+ receivedMessage = firstBrokerConsumer.receive(SHORT_TIMEOUT);
+ assertNotNull(receivedMessage);
+ assertTrue(receivedMessage instanceof TextMessage);
+ assertEquals(getName(), ((TextMessage) receivedMessage).getText());
+
+ receivedMessage.acknowledge();
+
+ Thread.sleep(LONG_TIMEOUT);
+ secondBrokerSession.close();
+ secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ secondBrokerConsumer = secondBrokerSession.createConsumer(destination);
+
+ receivedMessage = secondBrokerConsumer.receive(SHORT_TIMEOUT);
+ assertNull(receivedMessage);
+
+ firstBrokerSession.close();
+ secondBrokerSession.close();
+ }
+
+ public void testSendMessageWhenReplicaIsMirrored() throws Exception {
+ firstBroker = createFirstBroker();
+ secondBroker = createSecondBroker();
+ secondBroker.setUseMirroredQueues(true);
+ startFirstBroker();
+ startSecondBroker();
+
+ firstBrokerConnectionFactory = new ActiveMQConnectionFactory(firstBindAddress);
+ secondBrokerConnectionFactory = new ActiveMQConnectionFactory(secondBindAddress);
+ destination = createDestination();
+ firstBrokerConnection = firstBrokerConnectionFactory.createConnection();
+ firstBrokerConnection.start();
+ secondBrokerConnection = secondBrokerConnectionFactory.createConnection();
+ secondBrokerConnection.start();
+
+ waitUntilReplicationQueueHasConsumer(firstBroker);
+
+ Session firstBrokerSession = firstBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer firstBrokerProducer = firstBrokerSession.createProducer(destination);
+ MessageConsumer firstBrokerConsumer = firstBrokerSession.createConsumer(destination);
+
+ Session secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer secondBrokerConsumer = secondBrokerSession.createConsumer(destination);
+
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setText(getName());
+ firstBrokerProducer.send(message);
+
+ Message receivedMessage = secondBrokerConsumer.receive(LONG_TIMEOUT);
+ assertNotNull(receivedMessage);
+ assertTrue(receivedMessage instanceof TextMessage);
+ assertEquals(getName(), ((TextMessage) receivedMessage).getText());
+
+ receivedMessage = firstBrokerConsumer.receive(SHORT_TIMEOUT);
+ assertNotNull(receivedMessage);
+ assertTrue(receivedMessage instanceof TextMessage);
+ assertEquals(getName(), ((TextMessage) receivedMessage).getText());
+
+ receivedMessage.acknowledge();
+
+ Thread.sleep(LONG_TIMEOUT);
+ secondBrokerSession.close();
+ secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ secondBrokerConsumer = secondBrokerSession.createConsumer(destination);
+
+ receivedMessage = secondBrokerConsumer.receive(SHORT_TIMEOUT);
+ assertNull(receivedMessage);
+
+ firstBrokerSession.close();
+ secondBrokerSession.close();
+ }
+
+ public void testSendMessageWhenBothSidesMirrored() throws Exception {
+ firstBroker = createFirstBroker();
+ firstBroker.setUseMirroredQueues(true);
+ secondBroker = createSecondBroker();
+ secondBroker.setUseMirroredQueues(true);
+ startFirstBroker();
+ startSecondBroker();
+
+ firstBrokerConnectionFactory = new ActiveMQConnectionFactory(firstBindAddress);
+ secondBrokerConnectionFactory = new ActiveMQConnectionFactory(secondBindAddress);
+ destination = createDestination();
+ firstBrokerConnection = firstBrokerConnectionFactory.createConnection();
+ firstBrokerConnection.start();
+ secondBrokerConnection = secondBrokerConnectionFactory.createConnection();
+ secondBrokerConnection.start();
+
+ waitUntilReplicationQueueHasConsumer(firstBroker);
+
+ Session firstBrokerSession = firstBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer firstBrokerProducer = firstBrokerSession.createProducer(destination);
+ MessageConsumer firstBrokerConsumer = firstBrokerSession.createConsumer(destination);
+
+ Session secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer secondBrokerConsumer = secondBrokerSession.createConsumer(destination);
+
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setText(getName());
+ firstBrokerProducer.send(message);
+
+ Message receivedMessage = secondBrokerConsumer.receive(LONG_TIMEOUT);
+ assertNull(receivedMessage);
+
+ receivedMessage = firstBrokerConsumer.receive(SHORT_TIMEOUT);
+ assertNotNull(receivedMessage);
+ assertTrue(receivedMessage instanceof TextMessage);
+ assertEquals(getName(), ((TextMessage) receivedMessage).getText());
+
+ receivedMessage.acknowledge();
+
+ Thread.sleep(LONG_TIMEOUT);
+ secondBrokerSession.close();
+ secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ secondBrokerConsumer = secondBrokerSession.createConsumer(destination);
+
+ receivedMessage = secondBrokerConsumer.receive(SHORT_TIMEOUT);
+ assertNull(receivedMessage);
+
+ firstBrokerSession.close();
+ secondBrokerSession.close();
+ }
+
+
+}
\ No newline at end of file