From 90ee78a79e3e442b3e0f548374d1d85b5ea013b5 Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Wed, 24 Jan 2024 15:24:47 -0800 Subject: [PATCH] mirrored queue does not mirror replication queues (#25) --- .../ReplicaMirroredDestinationFilter.java | 43 ++++ ...ReplicaMirroredDestinationInterceptor.java | 45 ++++ .../replica/ReplicaRoleManagementBroker.java | 23 ++ .../activemq/replica/ReplicaSupport.java | 4 +- .../replica/ReplicaPluginMirrorQueueTest.java | 207 ++++++++++++++++++ 5 files changed, 319 insertions(+), 3 deletions(-) create mode 100644 activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaMirroredDestinationFilter.java create mode 100644 activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaMirroredDestinationInterceptor.java create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/broker/replica/ReplicaPluginMirrorQueueTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaMirroredDestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaMirroredDestinationFilter.java new file mode 100644 index 00000000000..3ddde277cc8 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaMirroredDestinationFilter.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.replica; + +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationFilter; +import org.apache.activemq.command.Message; + +public class ReplicaMirroredDestinationFilter extends DestinationFilter { + + private final ReplicaRoleManagementBroker roleManagementBroker; + + public ReplicaMirroredDestinationFilter(Destination next, ReplicaRoleManagementBroker roleManagementBroker) { + super(next); + this.roleManagementBroker = roleManagementBroker; + } + + @Override + public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { + if (ReplicaSupport.isReplicationDestination(messageSend.getDestination()) || + (roleManagementBroker.getRole() != ReplicaRole.source && getNext() instanceof DestinationFilter)) { + ((DestinationFilter) getNext()).getNext().send(producerExchange, messageSend); + } else { + super.send(producerExchange, messageSend); + } + } +} \ No newline at end of file diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaMirroredDestinationInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaMirroredDestinationInterceptor.java new file mode 100644 index 00000000000..8255b5f087b --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaMirroredDestinationInterceptor.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.replica; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.command.ActiveMQDestination; + +public class ReplicaMirroredDestinationInterceptor implements DestinationInterceptor { + + private final ReplicaRoleManagementBroker roleManagementBroker; + + public ReplicaMirroredDestinationInterceptor(ReplicaRoleManagementBroker roleManagementBroker) { + this.roleManagementBroker = roleManagementBroker; + } + + @Override + public Destination intercept(Destination destination) { + return new ReplicaMirroredDestinationFilter(destination, roleManagementBroker); + } + + @Override + public void remove(Destination destination) { + } + + @Override + public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception { + } +} \ No newline at end of file diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaRoleManagementBroker.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaRoleManagementBroker.java index 6173e83ac8e..35669513b62 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaRoleManagementBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaRoleManagementBroker.java @@ -25,6 +25,7 @@ import org.apache.activemq.broker.region.DestinationInterceptor; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.virtual.MirroredQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.MessageId; @@ -85,6 +86,7 @@ public ReplicaRoleManagementBroker(Broker broker, ReplicaPolicy replicaPolicy, R replicaBroker = buildReplicaBroker(queueProvider); addInterceptor4CompositeQueues(); + addInterceptor4MirroredQueues(); } @Override @@ -203,6 +205,27 @@ private void addInterceptor4CompositeQueues() { interceptors[interceptors.length - 1] = new ReplicaDestinationInterceptor(sourceBroker, this); compositeInterceptor.setInterceptors(interceptors); } + + private void addInterceptor4MirroredQueues() { + final RegionBroker regionBroker = (RegionBroker) broker.getAdaptor(RegionBroker.class); + final CompositeDestinationInterceptor compositeInterceptor = (CompositeDestinationInterceptor) regionBroker.getDestinationInterceptor(); + DestinationInterceptor[] interceptors = compositeInterceptor.getInterceptors(); + int index = -1; + for (int i = 0; i < interceptors.length; i++) { + if (interceptors[i] instanceof MirroredQueue) { + index = i; + break; + } + } + if (index < 0) { + return; + } + DestinationInterceptor[] newInterceptors = new DestinationInterceptor[interceptors.length + 1]; + System.arraycopy(interceptors, 0, newInterceptors, 0, index + 1); + System.arraycopy(interceptors, index + 1, newInterceptors, index + 2, interceptors.length - index - 1); + newInterceptors[index + 1] = new ReplicaMirroredDestinationInterceptor(this); + compositeInterceptor.setInterceptors(newInterceptors); + } private MutativeRoleBroker getNextByRole() { switch (role) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSupport.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSupport.java index 587d99053c7..a109206e1f4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSupport.java @@ -65,11 +65,9 @@ private ReplicaSupport() { public static final Set DELETABLE_REPLICATION_DESTINATION_NAMES = Set.of(MAIN_REPLICATION_QUEUE_NAME, INTERMEDIATE_REPLICATION_QUEUE_NAME, SEQUENCE_REPLICATION_QUEUE_NAME); - public static final Set REPLICATION_QUEUE_NAMES = Set.of(MAIN_REPLICATION_QUEUE_NAME, INTERMEDIATE_REPLICATION_QUEUE_NAME, SEQUENCE_REPLICATION_QUEUE_NAME, REPLICATION_ROLE_QUEUE_NAME); - public static final Set REPLICATION_TOPIC_NAMES = Set.of(MAIN_REPLICATION_QUEUE_NAME, - INTERMEDIATE_REPLICATION_QUEUE_NAME, SEQUENCE_REPLICATION_QUEUE_NAME, REPLICATION_ROLE_QUEUE_NAME); + public static final Set REPLICATION_TOPIC_NAMES = Set.of(REPLICATION_ROLE_ADVISORY_TOPIC_NAME); public static final Set REPLICATION_DESTINATION_NAMES = Stream.concat(REPLICATION_QUEUE_NAMES.stream(), REPLICATION_TOPIC_NAMES.stream()).collect(Collectors.toSet()); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/replica/ReplicaPluginMirrorQueueTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/replica/ReplicaPluginMirrorQueueTest.java new file mode 100644 index 00000000000..721504addb7 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/replica/ReplicaPluginMirrorQueueTest.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.replica; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQTextMessage; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +public class ReplicaPluginMirrorQueueTest extends ReplicaPluginTestSupport { + + protected Connection firstBrokerConnection; + protected Connection secondBrokerConnection; + + @Override + protected void setUp() throws Exception { + + } + @Override + protected void tearDown() throws Exception { + if (firstBrokerConnection != null) { + firstBrokerConnection.close(); + firstBrokerConnection = null; + } + if (secondBrokerConnection != null) { + secondBrokerConnection.close(); + secondBrokerConnection = null; + } + + super.tearDown(); + } + + public void testSendMessageWhenPrimaryIsMirrored() throws Exception { + firstBroker = createFirstBroker(); + firstBroker.setUseMirroredQueues(true); + secondBroker = createSecondBroker(); + startFirstBroker(); + startSecondBroker(); + + firstBrokerConnectionFactory = new ActiveMQConnectionFactory(firstBindAddress); + secondBrokerConnectionFactory = new ActiveMQConnectionFactory(secondBindAddress); + destination = createDestination(); + firstBrokerConnection = firstBrokerConnectionFactory.createConnection(); + firstBrokerConnection.start(); + secondBrokerConnection = secondBrokerConnectionFactory.createConnection(); + secondBrokerConnection.start(); + + waitUntilReplicationQueueHasConsumer(firstBroker); + + Session firstBrokerSession = firstBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer firstBrokerProducer = firstBrokerSession.createProducer(destination); + MessageConsumer firstBrokerConsumer = firstBrokerSession.createConsumer(destination); + + Session secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer secondBrokerConsumer = secondBrokerSession.createConsumer(destination); + + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText(getName()); + firstBrokerProducer.send(message); + + Message receivedMessage = secondBrokerConsumer.receive(LONG_TIMEOUT); + assertNull(receivedMessage); + + receivedMessage = firstBrokerConsumer.receive(SHORT_TIMEOUT); + assertNotNull(receivedMessage); + assertTrue(receivedMessage instanceof TextMessage); + assertEquals(getName(), ((TextMessage) receivedMessage).getText()); + + receivedMessage.acknowledge(); + + Thread.sleep(LONG_TIMEOUT); + secondBrokerSession.close(); + secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + secondBrokerConsumer = secondBrokerSession.createConsumer(destination); + + receivedMessage = secondBrokerConsumer.receive(SHORT_TIMEOUT); + assertNull(receivedMessage); + + firstBrokerSession.close(); + secondBrokerSession.close(); + } + + public void testSendMessageWhenReplicaIsMirrored() throws Exception { + firstBroker = createFirstBroker(); + secondBroker = createSecondBroker(); + secondBroker.setUseMirroredQueues(true); + startFirstBroker(); + startSecondBroker(); + + firstBrokerConnectionFactory = new ActiveMQConnectionFactory(firstBindAddress); + secondBrokerConnectionFactory = new ActiveMQConnectionFactory(secondBindAddress); + destination = createDestination(); + firstBrokerConnection = firstBrokerConnectionFactory.createConnection(); + firstBrokerConnection.start(); + secondBrokerConnection = secondBrokerConnectionFactory.createConnection(); + secondBrokerConnection.start(); + + waitUntilReplicationQueueHasConsumer(firstBroker); + + Session firstBrokerSession = firstBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer firstBrokerProducer = firstBrokerSession.createProducer(destination); + MessageConsumer firstBrokerConsumer = firstBrokerSession.createConsumer(destination); + + Session secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer secondBrokerConsumer = secondBrokerSession.createConsumer(destination); + + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText(getName()); + firstBrokerProducer.send(message); + + Message receivedMessage = secondBrokerConsumer.receive(LONG_TIMEOUT); + assertNotNull(receivedMessage); + assertTrue(receivedMessage instanceof TextMessage); + assertEquals(getName(), ((TextMessage) receivedMessage).getText()); + + receivedMessage = firstBrokerConsumer.receive(SHORT_TIMEOUT); + assertNotNull(receivedMessage); + assertTrue(receivedMessage instanceof TextMessage); + assertEquals(getName(), ((TextMessage) receivedMessage).getText()); + + receivedMessage.acknowledge(); + + Thread.sleep(LONG_TIMEOUT); + secondBrokerSession.close(); + secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + secondBrokerConsumer = secondBrokerSession.createConsumer(destination); + + receivedMessage = secondBrokerConsumer.receive(SHORT_TIMEOUT); + assertNull(receivedMessage); + + firstBrokerSession.close(); + secondBrokerSession.close(); + } + + public void testSendMessageWhenBothSidesMirrored() throws Exception { + firstBroker = createFirstBroker(); + firstBroker.setUseMirroredQueues(true); + secondBroker = createSecondBroker(); + secondBroker.setUseMirroredQueues(true); + startFirstBroker(); + startSecondBroker(); + + firstBrokerConnectionFactory = new ActiveMQConnectionFactory(firstBindAddress); + secondBrokerConnectionFactory = new ActiveMQConnectionFactory(secondBindAddress); + destination = createDestination(); + firstBrokerConnection = firstBrokerConnectionFactory.createConnection(); + firstBrokerConnection.start(); + secondBrokerConnection = secondBrokerConnectionFactory.createConnection(); + secondBrokerConnection.start(); + + waitUntilReplicationQueueHasConsumer(firstBroker); + + Session firstBrokerSession = firstBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer firstBrokerProducer = firstBrokerSession.createProducer(destination); + MessageConsumer firstBrokerConsumer = firstBrokerSession.createConsumer(destination); + + Session secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer secondBrokerConsumer = secondBrokerSession.createConsumer(destination); + + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText(getName()); + firstBrokerProducer.send(message); + + Message receivedMessage = secondBrokerConsumer.receive(LONG_TIMEOUT); + assertNull(receivedMessage); + + receivedMessage = firstBrokerConsumer.receive(SHORT_TIMEOUT); + assertNotNull(receivedMessage); + assertTrue(receivedMessage instanceof TextMessage); + assertEquals(getName(), ((TextMessage) receivedMessage).getText()); + + receivedMessage.acknowledge(); + + Thread.sleep(LONG_TIMEOUT); + secondBrokerSession.close(); + secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + secondBrokerConsumer = secondBrokerSession.createConsumer(destination); + + receivedMessage = secondBrokerConsumer.receive(SHORT_TIMEOUT); + assertNull(receivedMessage); + + firstBrokerSession.close(); + secondBrokerSession.close(); + } + + +} \ No newline at end of file