Skip to content

Commit

Permalink
mirrored queue does not mirror replication queues (amazon-mq#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
Charlie-chenchrl authored and kartg committed Mar 14, 2024
1 parent fa6d603 commit b3e3b19
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.replica;

import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.command.Message;

public class ReplicaMirroredDestinationFilter extends DestinationFilter {

private final ReplicaRoleManagementBroker roleManagementBroker;

public ReplicaMirroredDestinationFilter(Destination next, ReplicaRoleManagementBroker roleManagementBroker) {
super(next);
this.roleManagementBroker = roleManagementBroker;
}

@Override
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
if (ReplicaSupport.isReplicationDestination(messageSend.getDestination()) ||
(roleManagementBroker.getRole() != ReplicaRole.source && getNext() instanceof DestinationFilter)) {
((DestinationFilter) getNext()).getNext().send(producerExchange, messageSend);
} else {
super.send(producerExchange, messageSend);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.replica;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.command.ActiveMQDestination;

public class ReplicaMirroredDestinationInterceptor implements DestinationInterceptor {

private final ReplicaRoleManagementBroker roleManagementBroker;

public ReplicaMirroredDestinationInterceptor(ReplicaRoleManagementBroker roleManagementBroker) {
this.roleManagementBroker = roleManagementBroker;
}

@Override
public Destination intercept(Destination destination) {
return new ReplicaMirroredDestinationFilter(destination, roleManagementBroker);
}

@Override
public void remove(Destination destination) {
}

@Override
public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.virtual.MirroredQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageId;
Expand Down Expand Up @@ -85,6 +86,7 @@ public ReplicaRoleManagementBroker(Broker broker, ReplicaPolicy replicaPolicy, R
replicaBroker = buildReplicaBroker(queueProvider);

addInterceptor4CompositeQueues();
addInterceptor4MirroredQueues();
}

@Override
Expand Down Expand Up @@ -203,6 +205,27 @@ private void addInterceptor4CompositeQueues() {
interceptors[interceptors.length - 1] = new ReplicaDestinationInterceptor(sourceBroker, this);
compositeInterceptor.setInterceptors(interceptors);
}

private void addInterceptor4MirroredQueues() {
final RegionBroker regionBroker = (RegionBroker) broker.getAdaptor(RegionBroker.class);
final CompositeDestinationInterceptor compositeInterceptor = (CompositeDestinationInterceptor) regionBroker.getDestinationInterceptor();
DestinationInterceptor[] interceptors = compositeInterceptor.getInterceptors();
int index = -1;
for (int i = 0; i < interceptors.length; i++) {
if (interceptors[i] instanceof MirroredQueue) {
index = i;
break;
}
}
if (index < 0) {
return;
}
DestinationInterceptor[] newInterceptors = new DestinationInterceptor[interceptors.length + 1];
System.arraycopy(interceptors, 0, newInterceptors, 0, index + 1);
System.arraycopy(interceptors, index + 1, newInterceptors, index + 2, interceptors.length - index - 1);
newInterceptors[index + 1] = new ReplicaMirroredDestinationInterceptor(this);
compositeInterceptor.setInterceptors(newInterceptors);
}

private MutativeRoleBroker getNextByRole() {
switch (role) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,9 @@ private ReplicaSupport() {

public static final Set<String> DELETABLE_REPLICATION_DESTINATION_NAMES = Set.of(MAIN_REPLICATION_QUEUE_NAME,
INTERMEDIATE_REPLICATION_QUEUE_NAME, SEQUENCE_REPLICATION_QUEUE_NAME);

public static final Set<String> REPLICATION_QUEUE_NAMES = Set.of(MAIN_REPLICATION_QUEUE_NAME,
INTERMEDIATE_REPLICATION_QUEUE_NAME, SEQUENCE_REPLICATION_QUEUE_NAME, REPLICATION_ROLE_QUEUE_NAME);
public static final Set<String> REPLICATION_TOPIC_NAMES = Set.of(MAIN_REPLICATION_QUEUE_NAME,
INTERMEDIATE_REPLICATION_QUEUE_NAME, SEQUENCE_REPLICATION_QUEUE_NAME, REPLICATION_ROLE_QUEUE_NAME);
public static final Set<String> REPLICATION_TOPIC_NAMES = Set.of(REPLICATION_ROLE_ADVISORY_TOPIC_NAME);

public static final Set<String> REPLICATION_DESTINATION_NAMES = Stream.concat(REPLICATION_QUEUE_NAMES.stream(),
REPLICATION_TOPIC_NAMES.stream()).collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.broker.replica;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class ReplicaPluginMirrorQueueTest extends ReplicaPluginTestSupport {

protected Connection firstBrokerConnection;
protected Connection secondBrokerConnection;

@Override
protected void setUp() throws Exception {

}
@Override
protected void tearDown() throws Exception {
if (firstBrokerConnection != null) {
firstBrokerConnection.close();
firstBrokerConnection = null;
}
if (secondBrokerConnection != null) {
secondBrokerConnection.close();
secondBrokerConnection = null;
}

super.tearDown();
}

public void testSendMessageWhenPrimaryIsMirrored() throws Exception {
firstBroker = createFirstBroker();
firstBroker.setUseMirroredQueues(true);
secondBroker = createSecondBroker();
startFirstBroker();
startSecondBroker();

firstBrokerConnectionFactory = new ActiveMQConnectionFactory(firstBindAddress);
secondBrokerConnectionFactory = new ActiveMQConnectionFactory(secondBindAddress);
destination = createDestination();
firstBrokerConnection = firstBrokerConnectionFactory.createConnection();
firstBrokerConnection.start();
secondBrokerConnection = secondBrokerConnectionFactory.createConnection();
secondBrokerConnection.start();

waitUntilReplicationQueueHasConsumer(firstBroker);

Session firstBrokerSession = firstBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer firstBrokerProducer = firstBrokerSession.createProducer(destination);
MessageConsumer firstBrokerConsumer = firstBrokerSession.createConsumer(destination);

Session secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer secondBrokerConsumer = secondBrokerSession.createConsumer(destination);

ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
firstBrokerProducer.send(message);

Message receivedMessage = secondBrokerConsumer.receive(LONG_TIMEOUT);
assertNull(receivedMessage);

receivedMessage = firstBrokerConsumer.receive(SHORT_TIMEOUT);
assertNotNull(receivedMessage);
assertTrue(receivedMessage instanceof TextMessage);
assertEquals(getName(), ((TextMessage) receivedMessage).getText());

receivedMessage.acknowledge();

Thread.sleep(LONG_TIMEOUT);
secondBrokerSession.close();
secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
secondBrokerConsumer = secondBrokerSession.createConsumer(destination);

receivedMessage = secondBrokerConsumer.receive(SHORT_TIMEOUT);
assertNull(receivedMessage);

firstBrokerSession.close();
secondBrokerSession.close();
}

public void testSendMessageWhenReplicaIsMirrored() throws Exception {
firstBroker = createFirstBroker();
secondBroker = createSecondBroker();
secondBroker.setUseMirroredQueues(true);
startFirstBroker();
startSecondBroker();

firstBrokerConnectionFactory = new ActiveMQConnectionFactory(firstBindAddress);
secondBrokerConnectionFactory = new ActiveMQConnectionFactory(secondBindAddress);
destination = createDestination();
firstBrokerConnection = firstBrokerConnectionFactory.createConnection();
firstBrokerConnection.start();
secondBrokerConnection = secondBrokerConnectionFactory.createConnection();
secondBrokerConnection.start();

waitUntilReplicationQueueHasConsumer(firstBroker);

Session firstBrokerSession = firstBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer firstBrokerProducer = firstBrokerSession.createProducer(destination);
MessageConsumer firstBrokerConsumer = firstBrokerSession.createConsumer(destination);

Session secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer secondBrokerConsumer = secondBrokerSession.createConsumer(destination);

ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
firstBrokerProducer.send(message);

Message receivedMessage = secondBrokerConsumer.receive(LONG_TIMEOUT);
assertNotNull(receivedMessage);
assertTrue(receivedMessage instanceof TextMessage);
assertEquals(getName(), ((TextMessage) receivedMessage).getText());

receivedMessage = firstBrokerConsumer.receive(SHORT_TIMEOUT);
assertNotNull(receivedMessage);
assertTrue(receivedMessage instanceof TextMessage);
assertEquals(getName(), ((TextMessage) receivedMessage).getText());

receivedMessage.acknowledge();

Thread.sleep(LONG_TIMEOUT);
secondBrokerSession.close();
secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
secondBrokerConsumer = secondBrokerSession.createConsumer(destination);

receivedMessage = secondBrokerConsumer.receive(SHORT_TIMEOUT);
assertNull(receivedMessage);

firstBrokerSession.close();
secondBrokerSession.close();
}

public void testSendMessageWhenBothSidesMirrored() throws Exception {
firstBroker = createFirstBroker();
firstBroker.setUseMirroredQueues(true);
secondBroker = createSecondBroker();
secondBroker.setUseMirroredQueues(true);
startFirstBroker();
startSecondBroker();

firstBrokerConnectionFactory = new ActiveMQConnectionFactory(firstBindAddress);
secondBrokerConnectionFactory = new ActiveMQConnectionFactory(secondBindAddress);
destination = createDestination();
firstBrokerConnection = firstBrokerConnectionFactory.createConnection();
firstBrokerConnection.start();
secondBrokerConnection = secondBrokerConnectionFactory.createConnection();
secondBrokerConnection.start();

waitUntilReplicationQueueHasConsumer(firstBroker);

Session firstBrokerSession = firstBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer firstBrokerProducer = firstBrokerSession.createProducer(destination);
MessageConsumer firstBrokerConsumer = firstBrokerSession.createConsumer(destination);

Session secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer secondBrokerConsumer = secondBrokerSession.createConsumer(destination);

ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
firstBrokerProducer.send(message);

Message receivedMessage = secondBrokerConsumer.receive(LONG_TIMEOUT);
assertNull(receivedMessage);

receivedMessage = firstBrokerConsumer.receive(SHORT_TIMEOUT);
assertNotNull(receivedMessage);
assertTrue(receivedMessage instanceof TextMessage);
assertEquals(getName(), ((TextMessage) receivedMessage).getText());

receivedMessage.acknowledge();

Thread.sleep(LONG_TIMEOUT);
secondBrokerSession.close();
secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
secondBrokerConsumer = secondBrokerSession.createConsumer(destination);

receivedMessage = secondBrokerConsumer.receive(SHORT_TIMEOUT);
assertNull(receivedMessage);

firstBrokerSession.close();
secondBrokerSession.close();
}


}

0 comments on commit b3e3b19

Please sign in to comment.