Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/replica broker #1

Draft
wants to merge 133 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
133 commits
Select commit Hold shift + click to select a range
7bf6a5a
[AMQ-8354] Add initial plugin skeleton
NikitaShupletsov Jun 8, 2022
035eff7
[AMQ-8354] Replica Broker can connect to source broker
NikitaShupletsov Jun 9, 2022
29498aa
[AMQ-8354] Replication of destination creation and removal
NikitaShupletsov Jun 10, 2022
6b1f0fc
[AMQ-8354] Replication of message send
NikitaShupletsov Jun 10, 2022
e1e2dbd
[AMQ-8354] Replication of purge queue and acknowledge message
NikitaShupletsov Jun 10, 2022
aecc133
[AMQ-8354] Replication of transaction begin, prepare, forget, rollbac…
NikitaShupletsov Jun 10, 2022
79295ad
[AMQ-8354] stop taskRunner on broker stop
NikitaShupletsov Jun 10, 2022
c3f3298
[AMQ-8354] Ensure destinations are replicated on startup
NikitaShupletsov Jun 10, 2022
348c347
[AMQ-8354] Isolate the replication queue by letting to add consumers …
NikitaShupletsov Jun 10, 2022
a0dda50
[AMQ-8354] Support of durable subscribers
NikitaShupletsov Jun 10, 2022
4506037
[AMQ-8354] Test of replication of virtual topics and expired messages.
NikitaShupletsov Jun 10, 2022
f475b55
[AMQ-8354] More tests for ReplicaSourceBroker
NikitaShupletsov Jun 10, 2022
e69ed7f
AMQ-8354 Add scheduled messages support to the replica plugin
Dm-Chebotarskyi Jun 20, 2022
e78ca61
[AMQ-8354] Move ReplicaBroker before ReplicaSourceBroker.
NikitaShupletsov Sep 12, 2022
acb299d
[AMQ-8354] Atomic send message on primary broker
NikitaShupletsov Sep 12, 2022
d4ff43e
[AMQ-8354] Different implementation of ack replication.
NikitaShupletsov Sep 12, 2022
851af7d
[AMQ-8354] Add the license
NikitaShupletsov Sep 12, 2022
d5a228f
[AMQ-8354] Add exponential replica retries.
NikitaShupletsov Jan 4, 2023
7e40ead
[AMQ-8354]
NikitaShupletsov Jan 4, 2023
e48fa5f
[AMQ-8354] Add batches for main replication queue.
NikitaShupletsov Jan 4, 2023
1b78c03
[AMQ-8354] Add compaction logic for send and ack events that cancel e…
NikitaShupletsov Jan 4, 2023
ba77112
[AMQ-8354] Do not send messages to mainQueue if there are no consumer…
NikitaShupletsov Jan 4, 2023
7f9f156
[AMQ-8354]
NikitaShupletsov Jan 4, 2023
004b459
[AMQ-8354] Add message compaction when there is no consumer.
NikitaShupletsov Jan 4, 2023
35a92dd
[AMQ-8354] Add replica batch acknowledge.
NikitaShupletsov Jan 4, 2023
a382d0a
[AMQ-8354]
NikitaShupletsov Jan 4, 2023
77512b0
[AMQ-8354] Improve Batcher to make batches bigger.
NikitaShupletsov Jan 4, 2023
752094f
[AMQ-8354]
NikitaShupletsov Jan 4, 2023
bb22ed5
[AMQ-8354] Replace ReplicaStorage with Sequence.Queue in ReplicaSeque…
NikitaShupletsov Jan 4, 2023
bb9bfc1
[AMQ-8354] Small refactoring.
NikitaShupletsov Jan 4, 2023
17956eb
Merge branch 'apache:main' into feature/replica_broker
NikitaShupletsov Jan 4, 2023
24f84c8
[AMQ-8354] Add missed licence.
NikitaShupletsov Jan 4, 2023
ec191da
[AMQ-8354] Add missed licence.
NikitaShupletsov Jan 4, 2023
d2fb99e
[AMQ-8354] Add helper methods needed for replica plugin.
NikitaShupletsov Jan 10, 2023
1fecf52
Reimplement getAllMessageIds method to avoid pontential out of memory…
NikitaShupletsov Jan 13, 2023
a32ab41
Convert AdvisoryBroker to MutableBrokerFilter
NikitaShupletsov Jan 17, 2023
0cd0fdf
Revert "Convert AdvisoryBroker to MutableBrokerFilter"
NikitaShupletsov Apr 20, 2023
abc69fa
Rollback the conversion of SchedulerBroker to MutableBrokerFilter.
NikitaShupletsov Apr 20, 2023
db18607
Remove getAllMessageIds.
NikitaShupletsov Apr 20, 2023
83c0546
Rework processDispatchNotification.
NikitaShupletsov Apr 20, 2023
49d1d76
Merge remote-tracking branch 'origin/main' into feature/replica_broke…
NikitaShupletsov Apr 20, 2023
bf7958c
Merge remote-tracking branch 'origin/main' into feature/replica_broker
NikitaShupletsov Apr 20, 2023
f1fce1e
Merge remote-tracking branch 'origin/feature/replica_broker_internal'…
NikitaShupletsov Apr 20, 2023
6e13d2c
[AMQ-8354] Fix replica sequence saving and parsing.
NikitaShupletsov Apr 20, 2023
a95b37f
[AMQ-8354] NoB support.
NikitaShupletsov Apr 20, 2023
7c6949c
[AMQ-8354] Extract constants to parameters.
NikitaShupletsov Apr 20, 2023
c4d51ef
[AMQ-8354] Fix errors during DLQ messages replication.
NikitaShupletsov Apr 20, 2023
2e08307
[AMQ-8354] Add JMX controllers for the failover
NikitaShupletsov Apr 20, 2023
682389b
[AMQ-8354] Fix btoker.stop() when there is a blocking operation on re…
NikitaShupletsov Apr 20, 2023
f4dbc71
[AMQ-8354] ReplicaBroker stop: close listener before close event cons…
NikitaShupletsov Apr 20, 2023
cce67aa
[AMQ-8354] Small refactoring.
NikitaShupletsov Apr 20, 2023
e19ad66
[AMQ-8354] Block consumption from replication queues when wildcard se…
NikitaShupletsov Apr 20, 2023
6f781ec
[AMQ-8354] Add deinitialization of ReplicaSequencer.
NikitaShupletsov Apr 20, 2023
8724e97
[AMQ-8354] Disable replication of non-persistent messages.
NikitaShupletsov Apr 20, 2023
a04803f
[AMQ-8354] Add failover support for replication
NikitaShupletsov Apr 20, 2023
09ca5f5
[AMQ-8354] Fix compaction with additional messages.
NikitaShupletsov Apr 20, 2023
6382d56
[AMQ-8354] Enable replication queue protection for replica.
NikitaShupletsov Apr 20, 2023
d1d5938
[AMQ-8354] ReplicationPlugin consumer is aborted by abortSlowAckConsu…
NikitaShupletsov Apr 20, 2023
639715e
[AMQ-8354] Fixed sequence recovery.
NikitaShupletsov Apr 20, 2023
8e479f3
[AMQ-8354] topic unsubscribe durable subscription event is not replic…
NikitaShupletsov Apr 20, 2023
837366f
[AMQ-8354] Replication queues purge notification
NikitaShupletsov Apr 20, 2023
6ec52b0
[AMQ-8354] fix acknowledge replication message from AMQP protocol
NikitaShupletsov Apr 20, 2023
87bdc6c
[AMQ-8354] Soft failover implementation
NikitaShupletsov Apr 20, 2023
c23f5eb
[AMQ-8354] Change batch ack to individual acks.
NikitaShupletsov Apr 20, 2023
ffe7fdd
[AMQ-8354] Fix transactions in compactor.
NikitaShupletsov Apr 20, 2023
1aa6289
[AMQ-8354] Replication event is out of order. Current sequence 1 belo…
NikitaShupletsov Apr 20, 2023
8d9b7be
[AMQ-8354] Isolate context to prevent concurrent modifications.
NikitaShupletsov Apr 20, 2023
1de0bfc
[AMQ-8354] Add web console access control.
NikitaShupletsov Apr 20, 2023
531b3f4
[AMQ-8354] Save Broker failover state and make failover more resilien…
NikitaShupletsov Apr 20, 2023
a7cde2d
[AMQ-8354] Fix send to main queue when there is only one message in t…
NikitaShupletsov Apr 20, 2023
2c1a9dc
[AMQ-8354] Remove scheduled messages replication support.
NikitaShupletsov Apr 20, 2023
471d5dc
[AMQ-8354] bug fix - Virtual destinations replication works incorrectly
NikitaShupletsov Apr 20, 2023
3bb6552
[AMQ-8354] Fix acks when they are a single message in a batch.
NikitaShupletsov Apr 20, 2023
0bf290a
[AMQ-8354] Ignore MessageAck and Transactional replication events if …
NikitaShupletsov Apr 20, 2023
41c36a5
[AMQ-8354] Redelivery plugin support.
NikitaShupletsov Apr 20, 2023
fcb676c
[AMQ-8354] Refactoring.
NikitaShupletsov Apr 26, 2023
9450293
[AMQ-8354] Add handling failures during failover on replica side.
NikitaShupletsov Apr 26, 2023
c5695d6
[AMQ-8354] Fix Not authorized to access destination: topic://ActiveMQ…
NikitaShupletsov Apr 26, 2023
7a0451f
[AMQ-8354] Fix role switching when there is an ongoing failover already.
NikitaShupletsov Apr 26, 2023
ae68560
[AMQ-8354] Less invasive implementation of advisory suppresor.
NikitaShupletsov Apr 26, 2023
e92b5d5
[AMQ-8354] Small refactoring.
NikitaShupletsov Apr 26, 2023
44216d2
[AMQ-8354] Fix and add tests.
NikitaShupletsov Apr 26, 2023
1df8656
[AMQ-8354] Replication plugin basic functionality tests - part 1.
NikitaShupletsov Apr 26, 2023
28c6ef6
[AMQ-8354] Replication plugin basic function tests - replication even…
NikitaShupletsov Apr 26, 2023
186a932
[AMQ-8354] Replication plugin basic function test - Replication event…
NikitaShupletsov Apr 26, 2023
0c52cfb
[AMQ-8354] plugin test: queue operations
NikitaShupletsov Apr 26, 2023
0fc8143
[AMQ-8354] plugin test: Topic operations
NikitaShupletsov Apr 26, 2023
75e4a06
[AMQ-8354] add message property replication test
NikitaShupletsov Apr 26, 2023
f7b1956
[AMQ-8354] replication NetworkConnector tests
NikitaShupletsov Apr 26, 2023
0cc90b4
[AMQ-8354] plugin Connection level protocol connection tests
NikitaShupletsov Apr 26, 2023
d434315
[AMQ-8354] added Connection mode tests and amqp connection test
NikitaShupletsov Apr 26, 2023
48e6360
[AMQ-8354] Add Replication Queue Operations Tests
NikitaShupletsov Apr 26, 2023
49b5a67
[AMQ-8354] Enable testDurableSubscribers
NikitaShupletsov Apr 26, 2023
9890a3a
[AMQ-8354] Replication test: hard failover
NikitaShupletsov Apr 27, 2023
48455db
[AMQ-8354] refactor integration tests
NikitaShupletsov Apr 27, 2023
6d72d74
[AMQ-8354] Replication Tests: soft failover tests
NikitaShupletsov Apr 27, 2023
e7bf5c6
[AMQ-8354] refactor integration tests
NikitaShupletsov Apr 27, 2023
4ab19f7
[AMQ-8354] Fix integration tests.
NikitaShupletsov Apr 27, 2023
b670d43
[AMQ-8354] add replication redelivery plugin test
NikitaShupletsov Apr 27, 2023
5e9bcec
[AMQ-8354] fix Replica Plugin Queue Test
NikitaShupletsov Apr 27, 2023
f0aa0cc
[AMQ-8354] Remove unused imports.
NikitaShupletsov May 4, 2023
47e37b5
[AMQ-8354] Add missing licenses.
NikitaShupletsov May 5, 2023
2dd3c5e
[AMQ-8354] Fix classloader issue.
NikitaShupletsov May 12, 2023
519322e
[AMQ-8354] Add heart beat messages.
NikitaShupletsov May 25, 2023
117c9cd
[AMQ-8354] Add versioning.
NikitaShupletsov Jun 3, 2023
43b6775
[AMQ-8354] Throw exception on replication errors.
NikitaShupletsov Jun 3, 2023
213802e
[AMQ-8354] Delete TODOs and FIXMEs
NikitaShupletsov Jun 3, 2023
b5b3975
[AMQ-8354] add replication lag and wait time metrics.
NikitaShupletsov Jun 3, 2023
eb7a9c5
Merge remote-tracking branch 'origin/main' into feature/replica_broker
NikitaShupletsov Jun 16, 2023
5c580fb
[AMQ-8394] Don't disable jolokia server.
NikitaShupletsov Jun 17, 2023
4ac7d1c
add replication plugin test profile: replica-plugin
Charlie-chenchrl Jun 19, 2023
600f8b6
Merge branch 'apache:main' into feature/replica_broker
NikitaShupletsov Jun 26, 2023
b33dfbc
Fix slow ack replication.
NikitaShupletsov Jun 26, 2023
1dfc179
[AMQ-8354] Fix Replication event is out of order on broker restart.
NikitaShupletsov Jul 19, 2023
c371eb3
[AMQ-8354] Force producer flow control.
NikitaShupletsov Jul 19, 2023
97fc15e
[AMQ-8354] Add replication flow control.
NikitaShupletsov Jul 19, 2023
47934f0
[AMQ-8354] Add an error log if a replication message is being sent to…
NikitaShupletsov Jul 19, 2023
3ccb887
[AMQ-8354] Add JMX metric to monitor replication flow control.
NikitaShupletsov Jul 19, 2023
8ff284e
mirrored queue does not mirror replication queues (#25)
Charlie-chenchrl Jan 24, 2024
07bacc3
Message expired failure when the destination doesn't exist
alekseie Nov 18, 2023
ba17974
fix flaky Replication Integration tests
Charlie-chenchrl Jun 6, 2023
bd5aa04
fix flaky test: ReplicaAcknowledgeReplicationEventTest
Charlie-chenchrl Jun 8, 2023
136d056
[AMQ-8354] Fix deinitialization of ReplicaBroker.
NikitaShupletsov Feb 28, 2024
faa08a0
[AMQ-8354] Fix replication of durable subscribers.
NikitaShupletsov May 8, 2024
a271b35
[AMQ-8354] Fix tests
NikitaShupletsov Jun 13, 2024
8de0322
[AMQ-8354] Add getMessagesUntilMatches method.
NikitaShupletsov Jun 13, 2024
5f825bb
[AMQ-8354] Add logic to load more messages if we can't find last mess…
NikitaShupletsov Jun 13, 2024
a92ecb4
Fix casting to Topic if there are other interceptors.
NikitaShupletsov Jun 18, 2024
2532299
[AMQ-8354] Update mirrored queues tests to include virtual topic inte…
NikitaShupletsov Jun 18, 2024
d1eb0bc
[AMQ-8354] Fix thread leak when replication connection is failing to …
NikitaShupletsov Jun 18, 2024
8edaa17
[AMQ-8354] Add the ability to hide replication destinations.
NikitaShupletsov Jun 27, 2024
2dc778c
[AMQ-8354] Fix broken tests.
NikitaShupletsov Jun 27, 2024
a35a6a5
[AMQ-8354] Fix unit tests.
NikitaShupletsov Jun 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;

import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.spring.SpringSslContext;
import org.apache.activemq.test.TestSupport;
import org.apache.activemq.transport.amqp.protocol.AmqpConnection;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.Collection;

@RunWith(Parameterized.class)
public class ReplicaPluginAmqpConnectionTest extends TestSupport {

private static final Logger LOG = LoggerFactory.getLogger(ReplicaPluginAmqpConnectionTest.class);
public static final String KEYSTORE_TYPE = "jks";
public static final String PASSWORD = "password";
private final SpringSslContext sslContext = new SpringSslContext();
private static final long LONG_TIMEOUT = 15000;

public static final String PRIMARY_BROKER_CONFIG = "org/apache/activemq/transport/amqp/transport-protocol-test-primary.xml";
public static final String REPLICA_BROKER_CONFIG = "org/apache/activemq/transport/amqp/transport-protocol-test-replica.xml";
private final String protocol;
protected BrokerService firstBroker;
protected BrokerService secondBroker;
private JmsConnection firstBrokerConnection;
private JmsConnection secondBrokerConnection;
protected ActiveMQDestination destination;

@Before
public void setUp() throws Exception {
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
SSLContext.setDefault(ctx);
final File classesDir = new File(AmqpConnection.class.getProtectionDomain().getCodeSource().getLocation().getFile());
File keystore = new File(classesDir, "../../src/test/resources/keystore");
final SpringSslContext sslContext = new SpringSslContext();
sslContext.setKeyStore(keystore.getCanonicalPath());
sslContext.setKeyStorePassword("password");
sslContext.setTrustStore(keystore.getCanonicalPath());
sslContext.setTrustStorePassword("password");
sslContext.afterPropertiesSet();
System.setProperty("javax.net.ssl.trustStore", keystore.getCanonicalPath());
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
System.setProperty("javax.net.ssl.keyStore", keystore.getCanonicalPath());
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);

firstBroker = setUpBrokerService(PRIMARY_BROKER_CONFIG);
secondBroker = setUpBrokerService(REPLICA_BROKER_CONFIG);

firstBroker.start();
secondBroker.start();
firstBroker.waitUntilStarted();
secondBroker.waitUntilStarted();

destination = new ActiveMQQueue(getClass().getName());
}

@After
public void tearDown() throws Exception {
firstBrokerConnection.close();
secondBrokerConnection.close();
if (firstBroker != null) {
try {
firstBroker.stop();
firstBroker.waitUntilStopped();
} catch (Exception e) {
}
}
if (secondBroker != null) {
try {
secondBroker.stop();
secondBroker.waitUntilStopped();
} catch (Exception e) {
}
}
}

@Parameterized.Parameters(name="protocol={0}")
public static Collection<String[]> getTestParameters() {
return Arrays.asList(new String[][] {
{"amqp"}, {"amqp+ssl"}, {"amqp+nio+ssl"}, {"amqp+nio"},
});
}

@Test
@Ignore
public void messageSendAndReceive() throws Exception {
JmsConnectionFactory firstBrokerFactory = createConnectionFactory(firstBroker.getTransportConnectorByScheme(protocol));
firstBrokerConnection = (JmsConnection) firstBrokerFactory.createConnection();
firstBrokerConnection.setClientID("testMessageSendAndReceive-" + System.currentTimeMillis());
secondBrokerConnection = (JmsConnection) createConnectionFactory(secondBroker.getTransportConnectorByScheme(protocol)).createConnection();
secondBrokerConnection.setClientID("testMessageSendAndReceive-" + System.currentTimeMillis());
firstBrokerConnection.start();
secondBrokerConnection.start();

Session firstBrokerSession = firstBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session secondBrokerSession = secondBrokerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer firstBrokerProducer = firstBrokerSession.createProducer(destination);
MessageConsumer secondBrokerConsumer = secondBrokerSession.createConsumer(destination);


ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
firstBrokerProducer.send(message);

Message receivedMessage = secondBrokerConsumer.receive(LONG_TIMEOUT);
assertEquals(getName(), ((TextMessage) receivedMessage).getText());


Connection firstBrokerConsumerConnection = JMSClientContext.INSTANCE.createConnection(URI.create(protocol + "://localhost:" + firstBroker.getTransportConnectorByScheme(protocol).getConnectUri().getPort()));
firstBrokerConsumerConnection.start();
Session firstBrokerConsumerSession = firstBrokerConsumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer firstBrokerConsumer = firstBrokerConsumerSession.createConsumer(destination);
receivedMessage = firstBrokerConsumer.receive(LONG_TIMEOUT);
assertNotNull(receivedMessage);
receivedMessage.acknowledge();

receivedMessage = secondBrokerConsumer.receive(LONG_TIMEOUT);
assertNull(receivedMessage);

firstBrokerSession.close();
secondBrokerSession.close();
}

private JmsConnectionFactory createConnectionFactory(TransportConnector connector) throws IOException, URISyntaxException {
return new JmsConnectionFactory(protocol + "://localhost:" + connector.getConnectUri().getPort());
}

public ReplicaPluginAmqpConnectionTest(String protocol) {
this.protocol = protocol;
}

protected BrokerService setUpBrokerService(String configurationUri) throws Exception {
BrokerService broker = createBroker(configurationUri);
broker.setPersistent(false);
broker.setSslContext(sslContext);
return broker;
}

protected BrokerService createBroker(String uri) throws Exception {
LOG.info("Loading broker configuration from the classpath with URI: " + uri);
return BrokerFactory.createBroker(new URI("xbean:" + uri));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!-- this file can only be parsed using the xbean-spring library -->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<broker useJmx="false" persistent="false" xmlns="http://activemq.apache.org/schema/core" populateJMSXUserID="true">

<destinations>
<queue physicalName="TEST.Q" />
</destinations>

<transportConnectors>
<transportConnector name="vm" uri="vm://firstbroker" />
<transportConnector name="amqp" uri="amqp://0.0.0.0:0"/>
<transportConnector name="amqp+nio" uri="amqp+nio://0.0.0.0:0"/>
<transportConnector name="amqp+nio+ssl" uri="amqp+nio+ssl://0.0.0.0:0"/>
<transportConnector name="amqp+ssl" uri="amqp+ssl://0.0.0.0:0"/>
</transportConnectors>

<plugins>
<replicaPlugin role="source" transportConnectorUri="tcp://0.0.0.0:61618" otherBrokerUri="tcp://localhost:61619" userName="replicaUser" password="replica"/>
</plugins>
</broker>

</beans>
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!-- this file can only be parsed using the xbean-spring library -->
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<broker useJmx="false" persistent="false" xmlns="http://activemq.apache.org/schema/core" populateJMSXUserID="true">

<destinations>
<queue physicalName="TEST.Q" />
</destinations>

<transportConnectors>
<transportConnector name="vm" uri="vm://secondbroker" />
<transportConnector name="amqp" uri="amqp://0.0.0.0:0"/>
<transportConnector name="amqp+nio" uri="amqp+nio://0.0.0.0:0"/>
<transportConnector name="amqp+nio+ssl" uri="amqp+nio+ssl://0.0.0.0:0"/>
<transportConnector name="amqp+ssl" uri="amqp+ssl://0.0.0.0:0"/>
</transportConnectors>

<plugins>
<replicaPlugin role="replica" transportConnectorUri="tcp://0.0.0.0:61619" otherBrokerUri="tcp://localhost:61618" userName="replicaUser" password="replica"/>
</plugins>
</broker>

</beans>
11 changes: 11 additions & 0 deletions activemq-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.11.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<reporting>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,5 +412,5 @@ public interface Broker extends Region, Service {

void networkBridgeStopped(BrokerInfo brokerInfo);


void queuePurged(ConnectionContext context, ActiveMQDestination destination);
}
Original file line number Diff line number Diff line change
Expand Up @@ -413,4 +413,9 @@ public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex,
public void networkBridgeStopped(BrokerInfo brokerInfo) {
getNext().networkBridgeStopped(brokerInfo);
}

@Override
public void queuePurged(ConnectionContext context, ActiveMQDestination destination) {
getNext().queuePurged(context, destination);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public String getUserName() {
return userName;
}

protected void setUserName(String userName) {
public void setUserName(String userName) {
this.userName = userName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,7 @@ public ThreadPoolExecutor getExecutor() {
return null;
}

@Override
public void queuePurged(ConnectionContext context, ActiveMQDestination destination) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -410,4 +410,9 @@ public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex,
public void networkBridgeStopped(BrokerInfo brokerInfo) {
throw new BrokerStoppedException(this.message);
}

@Override
public void queuePurged(ConnectionContext context, ActiveMQDestination destination) {
throw new BrokerStoppedException(this.message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -163,6 +164,8 @@ public void add(MessageReference node) throws Exception {
public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
synchronized(pendingLock) {
try {
okForAckAsDispatchDone.countDown();

pending.reset();
while (pending.hasNext()) {
MessageReference node = pending.next();
Expand Down Expand Up @@ -566,6 +569,12 @@ public void setPending(PendingMessageCursor pending) {
}
}

public List<MessageReference> getDispatched() {
synchronized(dispatchLock) {
return new ArrayList<>(dispatched);
}
}

@Override
public void add(ConnectionContext context, Destination destination) throws Exception {
synchronized(pendingLock) {
Expand Down
Loading