Skip to content

Commit

Permalink
Squashed commit for replica plugin
Browse files Browse the repository at this point in the history
[AMQ-8354] Add initial plugin skeleton

[AMQ-8354] Replica Broker can connect to source broker

[AMQ-8354] Replication of destination creation and removal

[AMQ-8354] Replication of message send

[AMQ-8354] Replication of purge queue and acknowledge message

[AMQ-8354] Replication of transaction begin, prepare, forget, rollback, commit

[AMQ-8354] stop taskRunner on broker stop

[AMQ-8354] Ensure destinations are replicated on startup

[AMQ-8354] Isolate the replication queue by letting to add consumers and producers only via the replication transport.

[AMQ-8354] Support of durable subscribers

[AMQ-8354] Test of replication of virtual topics and expired messages.

[AMQ-8354] More tests for ReplicaSourceBroker

AMQ-8354 Add scheduled messages support to the replica plugin

[AMQ-8354] Move ReplicaBroker before ReplicaSourceBroker.
Add missed try-catch in replicateDestinationCreation.
Bug fix: not deleting destination from the list of destination to replicate when deleting the destination.

[AMQ-8354] Atomic send message on primary broker

[AMQ-8354] Different implementation of ack replication.

[AMQ-8354] Add the license

[AMQ-8354] Add exponential replica retries.

[AMQ-8354]
Add an intermediate replication queue.
Make the main replication queue non-persistent.
Add logic for adding and checking sequences on both sides.

[AMQ-8354] Add batches for main replication queue.

[AMQ-8354] Add compaction logic for send and ack events that cancel each other out.

[AMQ-8354] Do not send messages to mainQueue if there are no consumers to apply message compaction on the main queue.

[AMQ-8354]
Increased the size of batches.
Made batches idempotent.

[AMQ-8354] Add message compaction when there is no consumer.

[AMQ-8354] Add replica batch acknowledge.

[AMQ-8354]
Split threads for ack and for send.
Added logic to ignore Delivered and Unmatched acks.

[AMQ-8354] Improve Batcher to make batches bigger.

[AMQ-8354]
Reduce the delay between acks.
Reduce the amount of messages in ack batches.

[AMQ-8354] Replace ReplicaStorage with Sequence.Queue in ReplicaSequencer.

[AMQ-8354] Small refactoring.

[AMQ-8354] Add missed licence.

[AMQ-8354] Add missed licence.

[AMQ-8354] Add helper methods needed for replica plugin.

Reimplement getAllMessageIds method to avoid pontential out of memory errors.

Convert AdvisoryBroker to MutableBrokerFilter

Revert "Convert AdvisoryBroker to MutableBrokerFilter"

This reverts commit a32ab41.

Rollback the conversion of SchedulerBroker to MutableBrokerFilter.

Remove getAllMessageIds.

Rework processDispatchNotification.

[AMQ-8354] Fix replica sequence saving and parsing.
Fix BrokerStoppedException handling.

[AMQ-8354] NoB support.

[AMQ-8354] Extract constants to parameters.

[AMQ-8354] Fix errors during DLQ messages replication.

[AMQ-8354] Add JMX controllers for the failover

[AMQ-8354] Fix btoker.stop() when there is a blocking operation on replica.

[AMQ-8354] ReplicaBroker stop: close listener before close event consumer

[AMQ-8354] Small refactoring.

[AMQ-8354] Block consumption from replication queues when wildcard selector used.
Block replication queue deletion.

[AMQ-8354] Add deinitialization of ReplicaSequencer.

[AMQ-8354] Disable replication of non-persistent messages.

[AMQ-8354] Add failover support for replication

[AMQ-8354] Fix compaction with additional messages.
Now when we look for additional messages, we gracefully handle the processed messages.

[AMQ-8354] Enable replication queue protection for replica.
Remove source role as it's no longer supported.

[AMQ-8354] ReplicationPlugin consumer is aborted by abortSlowAckConsumerStrategy

[AMQ-8354] Fixed sequence recovery.

[AMQ-8354] topic unsubscribe durable subscription event is not replicated

[AMQ-8354] Replication queues purge notification

[AMQ-8354] fix acknowledge replication message from AMQP protocol

[AMQ-8354] Soft failover implementation

[AMQ-8354] Change batch ack to individual acks.

[AMQ-8354] Fix transactions in compactor.
Simplify the logic to make it more error prone.

[AMQ-8354] Replication event is out of order. Current sequence 1 belongs to message with id

[AMQ-8354] Isolate context to prevent concurrent modifications.

[AMQ-8354] Add web console access control.

[AMQ-8354] Save Broker failover state and make failover more resilient to failure

[AMQ-8354] Fix send to main queue when there is only one message in the batch.
Fix sequence validation for FAIL_OVER messages.

[AMQ-8354] Remove scheduled messages replication support.

[AMQ-8354] bug fix - Virtual destinations replication works incorrectly

[AMQ-8354] Fix acks when they are a single message in a batch.

[AMQ-8354] Ignore MessageAck and Transactional replication events if corresponding entities do not exist

[AMQ-8354] Redelivery plugin support.

[AMQ-8354] Refactoring.
The goals of it was:
1. make sure the role field in ReplicaRoleManagementBroker and the role saved in the storage align.
2. force and not force failovers follow the same flow as much as possible.
3. The failover flows for source and replica look as similar as possible.
4. existing code is reused as much as possible

Bugs that were fixed:
1. connections are not blocked on start
2. after failover JMX returns wrong role
3. during soft failover there is a chance that the failover message is acked, but the role is not updated.
4. race condition during soft failover that may lead to incorrect role change

[AMQ-8354] Add handling failures during failover on replica side.

[AMQ-8354] Fix Not authorized to access destination: topic://ActiveMQ.Plugin.Replication.Role.Advisory.Topic

[AMQ-8354] Fix role switching when there is an ongoing failover already.

[AMQ-8354] Less invasive implementation of advisory suppresor.

[AMQ-8354] Small refactoring.

[AMQ-8354] Fix and add tests.

[AMQ-8354] Replication plugin basic functionality tests - part 1.

[AMQ-8354] Replication plugin basic function tests - replication event handling

[AMQ-8354] Replication plugin basic function test - Replication event Ack

[AMQ-8354] plugin test: queue operations

[AMQ-8354] plugin test: Topic operations

[AMQ-8354] add message property replication test

[AMQ-8354] replication NetworkConnector tests

[AMQ-8354] plugin Connection level protocol connection tests

[AMQ-8354] added Connection mode tests and amqp connection test

[AMQ-8354] Add Replication Queue Operations Tests

[AMQ-8354] Enable testDurableSubscribers

[AMQ-8354] Replication test: hard failover

[AMQ-8354] refactor integration tests

[AMQ-8354] Replication Tests: soft failover tests

[AMQ-8354] refactor integration tests

[AMQ-8354] Fix integration tests.

[AMQ-8354] add replication redelivery plugin test

[AMQ-8354] fix Replica Plugin Queue Test

[AMQ-8354] Remove unused imports.

[AMQ-8354] Add missing licenses.

[AMQ-8354] Fix classloader issue.
Improve failover logs.

[AMQ-8354] Add heart beat messages.

[AMQ-8354] Add versioning.

[AMQ-8354] Throw exception on replication errors.

[AMQ-8354] Delete TODOs and FIXMEs

[AMQ-8354] add replication lag and wait time metrics.

[AMQ-8394] Don't disable jolokia server.

add replication plugin test profile: replica-plugin

Fix slow ack replication.

[AMQ-8354] Fix Replication event is out of order on broker restart.

[AMQ-8354] Force producer flow control.

[AMQ-8354] Add replication flow control.

[AMQ-8354] Add an error log if a replication message is being sent to DLQ.

[AMQ-8354] Add JMX metric to monitor replication flow control.

mirrored queue does not mirror replication queues (#25)

Message expired failure when the destination doesn't exist

Fix Formatting

fix flaky Replication Integration tests

fix flaky test: ReplicaAcknowledgeReplicationEventTest

Fixup after rebase/squash

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
NikitaShupletsov authored and kenliao94 committed May 1, 2024
1 parent e42ba8d commit 275b8b9
Show file tree
Hide file tree
Showing 86 changed files with 15,859 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;

import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.spring.SpringSslContext;
import org.apache.activemq.test.TestSupport;
import org.apache.activemq.transport.amqp.protocol.AmqpConnection;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.Collection;

@RunWith(Parameterized.class)
public class ReplicaPluginAmqpConnectionTest extends TestSupport {

private static final Logger LOG = LoggerFactory.getLogger(ReplicaPluginAmqpConnectionTest.class);
public static final String KEYSTORE_TYPE = "jks";
public static final String PASSWORD = "password";
private final SpringSslContext sslContext = new SpringSslContext();
private static final long LONG_TIMEOUT = 15000;

public static final String PRIMARY_BROKER_CONFIG = "org/apache/activemq/transport/amqp/transport-protocol-test-primary.xml";
public static final String REPLICA_BROKER_CONFIG = "org/apache/activemq/transport/amqp/transport-protocol-test-replica.xml";
private final String protocol;
protected BrokerService firstBroker;
protected BrokerService secondBroker;
private JmsConnection firstBrokerConnection;
private JmsConnection secondBrokerConnection;
protected ActiveMQDestination destination;

@Before
public void setUp() throws Exception {
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
SSLContext.setDefault(ctx);
final File classesDir = new File(AmqpConnection.class.getProtectionDomain().getCodeSource().getLocation().getFile());
File keystore = new File(classesDir, "../../src/test/resources/keystore");
final SpringSslContext sslContext = new SpringSslContext();
sslContext.setKeyStore(keystore.getCanonicalPath());
sslContext.setKeyStorePassword("password");
sslContext.setTrustStore(keystore.getCanonicalPath());
sslContext.setTrustStorePassword("password");
sslContext.afterPropertiesSet();
System.setProperty("javax.net.ssl.trustStore", keystore.getCanonicalPath());
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
System.setProperty("javax.net.ssl.keyStore", keystore.getCanonicalPath());
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);

firstBroker = setUpBrokerService(PRIMARY_BROKER_CONFIG);
secondBroker = setUpBrokerService(REPLICA_BROKER_CONFIG);

firstBroker.start();
secondBroker.start();
firstBroker.waitUntilStarted();
secondBroker.waitUntilStarted();

destination = new ActiveMQQueue(getClass().getName());
}

@After
public void tearDown() throws Exception {
firstBrokerConnection.close();
secondBrokerConnection.close();
if (firstBroker != null) {
try {
firstBroker.stop();
firstBroker.waitUntilStopped();
} catch (Exception e) {
}
}
if (secondBroker != null) {
try {
secondBroker.stop();
secondBroker.waitUntilStopped();
} catch (Exception e) {
}
}
}

@Parameterized.Parameters(name="protocol={0}")
public static Collection<String[]> getTestParameters() {
return Arrays.asList(new String[][] {
{"amqp"}, {"amqp+ssl"}, {"amqp+nio+ssl"}, {"amqp+nio"},
});
}

@Test
@Ignore
public void messageSendAndReceive() throws Exception {
JmsConnectionFactory firstBrokerFactory = createConnectionFactory(firstBroker.getTransportConnectorByScheme(protocol));
firstBrokerConnection = (JmsConnection) firstBrokerFactory.createConnection();
firstBrokerConnection.setClientID("testMessageSendAndReceive-" + System.currentTimeMillis());
secondBrokerConnection = (JmsConnection) createConnectionFactory(secondBroker.getTransportConnectorByScheme(protocol)).createConnection();
secondBrokerConnection.setClientID("testMessageSendAndReceive-" + System.currentTimeMillis());
firstBrokerConnection.start();
secondBrokerConnection.start();

Session firstBrokerSession = firstBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer firstBrokerProducer = firstBrokerSession.createProducer(destination);
MessageConsumer secondBrokerConsumer = secondBrokerSession.createConsumer(destination);


ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
firstBrokerProducer.send(message);

Message receivedMessage = secondBrokerConsumer.receive(LONG_TIMEOUT);
assertEquals(getName(), ((TextMessage) receivedMessage).getText());


Connection firstBrokerConsumerConnection = JMSClientContext.INSTANCE.createConnection(URI.create(protocol + "://localhost:" + firstBroker.getTransportConnectorByScheme(protocol).getConnectUri().getPort()));
firstBrokerConsumerConnection.start();
Session firstBrokerConsumerSession = firstBrokerConsumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer firstBrokerConsumer = firstBrokerConsumerSession.createConsumer(destination);
receivedMessage = firstBrokerConsumer.receive(LONG_TIMEOUT);
assertNotNull(receivedMessage);
receivedMessage.acknowledge();

receivedMessage = secondBrokerConsumer.receive(LONG_TIMEOUT);
assertNull(receivedMessage);

firstBrokerSession.close();
secondBrokerSession.close();
}

private JmsConnectionFactory createConnectionFactory(TransportConnector connector) throws IOException, URISyntaxException {
return new JmsConnectionFactory(protocol + "://localhost:" + connector.getConnectUri().getPort());
}

public ReplicaPluginAmqpConnectionTest(String protocol) {
this.protocol = protocol;
}

protected BrokerService setUpBrokerService(String configurationUri) throws Exception {
BrokerService broker = createBroker(configurationUri);
broker.setPersistent(false);
broker.setSslContext(sslContext);
return broker;
}

protected BrokerService createBroker(String uri) throws Exception {
LOG.info("Loading broker configuration from the classpath with URI: " + uri);
return BrokerFactory.createBroker(new URI("xbean:" + uri));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!-- this file can only be parsed using the xbean-spring library -->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<broker useJmx="false" persistent="false" xmlns="http://activemq.apache.org/schema/core" populateJMSXUserID="true">

<destinations>
<queue physicalName="TEST.Q" />
</destinations>

<transportConnectors>
<transportConnector name="vm" uri="vm://firstbroker" />
<transportConnector name="amqp" uri="amqp://0.0.0.0:0"/>
<transportConnector name="amqp+nio" uri="amqp+nio://0.0.0.0:0"/>
<transportConnector name="amqp+nio+ssl" uri="amqp+nio+ssl://0.0.0.0:0"/>
<transportConnector name="amqp+ssl" uri="amqp+ssl://0.0.0.0:0"/>
</transportConnectors>

<plugins>
<replicaPlugin role="source" transportConnectorUri="tcp://0.0.0.0:61618" otherBrokerUri="tcp://localhost:61619" userName="replicaUser" password="replica"/>
</plugins>
</broker>

</beans>
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!-- this file can only be parsed using the xbean-spring library -->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<broker useJmx="false" persistent="false" xmlns="http://activemq.apache.org/schema/core" populateJMSXUserID="true">

<destinations>
<queue physicalName="TEST.Q" />
</destinations>

<transportConnectors>
<transportConnector name="vm" uri="vm://secondbroker" />
<transportConnector name="amqp" uri="amqp://0.0.0.0:0"/>
<transportConnector name="amqp+nio" uri="amqp+nio://0.0.0.0:0"/>
<transportConnector name="amqp+nio+ssl" uri="amqp+nio+ssl://0.0.0.0:0"/>
<transportConnector name="amqp+ssl" uri="amqp+ssl://0.0.0.0:0"/>
</transportConnectors>

<plugins>
<replicaPlugin role="replica" transportConnectorUri="tcp://0.0.0.0:61619" otherBrokerUri="tcp://localhost:61618" userName="replicaUser" password="replica"/>
</plugins>
</broker>

</beans>
11 changes: 11 additions & 0 deletions activemq-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.11.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<reporting>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2484,6 +2484,7 @@ private QueueMessageReference getMatchingMessage(MessageDispatchNotification mes
do {
doPageIn(true, false, getMaxPageSize());
pagedInMessagesLock.readLock().lock();
List<MessageReference> list = new ArrayList<>();
try {
if (pagedInMessages.size() == size) {
// nothing new to check - mem constraint on page in
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.replica;

public class ActiveMQReplicaException extends RuntimeException {

public ActiveMQReplicaException(String message) {
super(message);
}

public ActiveMQReplicaException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.replica;

import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Topic;

public class DestinationExtractor {

public static Queue extractQueue(Destination destination) {
return extract(destination, Queue.class);
}

static Topic extractTopic(Destination destination) {
return extract(destination, Topic.class);
}

static BaseDestination extractBaseDestination(Destination destination) {
return extract(destination, BaseDestination.class);
}

private static <T extends Destination> T extract(Destination destination, Class<T> clazz) {
Destination result = destination;
while (result != null && !clazz.isInstance(result)) {
if (result instanceof DestinationFilter) {
result = ((DestinationFilter) result).getNext();
} else {
return null;
}
}
return (T) result;
}
}
Loading

0 comments on commit 275b8b9

Please sign in to comment.