Skip to content

Commit

Permalink
[AMQ-8354] Add helper methods needed for replica plugin.
Browse files Browse the repository at this point in the history
* added a "queuePurged" method to notify about a queue being purged, with appropriate stub implementations for EmptyBroker and ErrorBroker. This is needed for queue purge replication.

* added ConnectionContext#setUserName to be able to differentiate replication context (the context we use to interact with the broker) from every other to prevent replication of replicated events.

* added a new public "getMatchingMessages" method to be able to get messages by selector when the consumer is full. Used by the replica plugin for message compaction.

* added "getDispatched" method to fetch the list of messages we are acknowledging. This is used by the replica plugin to correctly replicate ACKs

* Bugfix in processDispatchNotification, getMatchingMessage and processMessageDispatchNotification to be able to replicate acks correctly. These code paths have not been used since pure master-slave feature was deleted, but is needed by the replica broker plugin.
  • Loading branch information
kenliao94 committed May 1, 2024
1 parent ee0edaf commit e42ba8d
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -412,5 +412,5 @@ public interface Broker extends Region, Service {

void networkBridgeStopped(BrokerInfo brokerInfo);


void queuePurged(ConnectionContext context, ActiveMQDestination destination);
}
Original file line number Diff line number Diff line change
Expand Up @@ -413,4 +413,9 @@ public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex,
public void networkBridgeStopped(BrokerInfo brokerInfo) {
getNext().networkBridgeStopped(brokerInfo);
}

@Override
public void queuePurged(ConnectionContext context, ActiveMQDestination destination) {
getNext().queuePurged(context, destination);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public String getUserName() {
return userName;
}

protected void setUserName(String userName) {
public void setUserName(String userName) {
this.userName = userName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,7 @@ public ThreadPoolExecutor getExecutor() {
return null;
}

@Override
public void queuePurged(ConnectionContext context, ActiveMQDestination destination) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -410,4 +410,9 @@ public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex,
public void networkBridgeStopped(BrokerInfo brokerInfo) {
throw new BrokerStoppedException(this.message);
}

@Override
public void queuePurged(ConnectionContext context, ActiveMQDestination destination) {
throw new BrokerStoppedException(this.message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -163,6 +164,8 @@ public void add(MessageReference node) throws Exception {
public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
synchronized(pendingLock) {
try {
okForAckAsDispatchDone.countDown();

pending.reset();
while (pending.hasNext()) {
MessageReference node = pending.next();
Expand Down Expand Up @@ -566,6 +569,12 @@ public void setPending(PendingMessageCursor pending) {
}
}

public List<MessageReference> getDispatched() {
synchronized(dispatchLock) {
return new ArrayList<>(dispatched);
}
}

@Override
public void add(ConnectionContext context, Destination destination) throws Exception {
synchronized(pendingLock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand All @@ -45,6 +46,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
Expand Down Expand Up @@ -1302,9 +1304,11 @@ public QueueMessageReference getMessage(String id) {
}

public void purge() throws Exception {
ConnectionContext c = createConnectionContext();
List<MessageReference> list = null;
sendLock.lock();
purge(createConnectionContext());
}

public void purge(ConnectionContext c) throws Exception {
List<MessageReference> list;
try {
long originalMessageCount = this.destinationStatistics.getMessages().getCount();
do {
Expand Down Expand Up @@ -1334,6 +1338,7 @@ public void purge() throws Exception {
} finally {
sendLock.unlock();
}
broker.queuePurged(c, destination);
}

@Override
Expand Down Expand Up @@ -1504,6 +1509,66 @@ public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilte
return movedCounter;
}

/**
* Copies the messages matching the given selector up to the maximum number
* of matched messages
*
* @return the list messages matching the selector
*/
public List<QueueMessageReference> getMatchingMessages(ConnectionContext context, String selector, int maximumMessages) throws Exception {
return getMatchingMessages(context, createSelectorFilter(selector), maximumMessages);
}

/**
* Gets the messages matching the given filter up to the maximum number of
* matched messages
*
* @return the list messages matching the filter
*/
public List<QueueMessageReference> getMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, int maximumMessages) throws Exception {
Set<QueueMessageReference> set = new LinkedHashSet<>();

pagedInMessagesLock.readLock().lock();
try {
Iterator<MessageReference> iterator = pagedInMessages.iterator();

while (iterator.hasNext() && set.size() < maximumMessages) {
QueueMessageReference qmr = (QueueMessageReference) iterator.next();
if (filter.evaluate(context, qmr)) {
set.add(qmr);
}
}
} finally {
pagedInMessagesLock.readLock().unlock();
}

if (set.size() == maximumMessages) {
return new ArrayList<>(set);
}
messagesLock.writeLock().lock();
try {
try {
messages.setMaxBatchSize(getMaxPageSize());
messages.reset();
while (messages.hasNext() && set.size() < maximumMessages) {
MessageReference mr = messages.next();
QueueMessageReference qmr = createMessageReference(mr.getMessage());
qmr.decrementReferenceCount();
messages.rollback(qmr.getMessageId());
if (filter.evaluate(context, qmr)) {
set.add(qmr);
}

}
} finally {
messages.release();
}
} finally {
messagesLock.writeLock().unlock();
}
return new ArrayList<>(set);
}

/**
* Move a message
*
Expand Down Expand Up @@ -2357,61 +2422,84 @@ public void processDispatchNotification(MessageDispatchNotification messageDispa
}
}

private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification)
throws Exception {
QueueMessageReference message = null;
MessageId messageId = messageDispatchNotification.getMessageId();

pagedInPendingDispatchLock.writeLock().lock();
try {
for (MessageReference ref : dispatchPendingList) {
if (messageId.equals(ref.getMessageId())) {
message = (QueueMessageReference)ref;
dispatchPendingList.remove(ref);
break;
public void dispatchNotification(Subscription sub, List<MessageReference> messageList) throws Exception {
for (MessageReference message : messageList) {
pagedInMessagesLock.writeLock().lock();
try {
if (!pagedInMessages.contains(message)) {
pagedInMessages.addMessageLast(message);
}
} finally {
pagedInMessagesLock.writeLock().unlock();
}
} finally {
pagedInPendingDispatchLock.writeLock().unlock();
}

if (message == null) {
pagedInMessagesLock.readLock().lock();
pagedInPendingDispatchLock.writeLock().lock();
try {
message = (QueueMessageReference)pagedInMessages.get(messageId);
if (dispatchPendingList.contains(message)) {
dispatchPendingList.remove(message);
}
} finally {
pagedInMessagesLock.readLock().unlock();
pagedInPendingDispatchLock.writeLock().unlock();
}
}

if (message == null) {
messagesLock.writeLock().lock();
Set<MessageId> messageIds = messageList.stream().map(MessageReference::getMessageId).collect(Collectors.toSet());
messagesLock.writeLock().lock();
try {
try {
try {
messages.setMaxBatchSize(getMaxPageSize());
messages.reset();
while (messages.hasNext()) {
MessageReference node = messages.next();
int count = 0;
messages.setMaxBatchSize(getMaxPageSize());
messages.reset();
while (messages.hasNext()) {
MessageReference node = messages.next();
if (messageIds.contains(node.getMessageId())) {
messages.remove();
if (messageId.equals(node.getMessageId())) {
message = this.createMessageReference(node.getMessage());
break;
}
count++;
}
if (count == messageIds.size()) {
break;
}
} finally {
messages.release();
}
} finally {
messagesLock.writeLock().unlock();
messages.release();
}
} finally {
messagesLock.writeLock().unlock();
}

if (message == null) {
Message msg = loadMessage(messageId);
if (msg != null) {
message = this.createMessageReference(msg);
}
for (MessageReference message : messageList) {
sub.add(message);
MessageDispatchNotification mdn = new MessageDispatchNotification();
mdn.setMessageId(message.getMessageId());
sub.processMessageDispatchNotification(mdn);
}
}

private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification)
throws Exception {
QueueMessageReference message = null;
MessageId messageId = messageDispatchNotification.getMessageId();

int size = 0;
do {
doPageIn(true, false, getMaxPageSize());
pagedInMessagesLock.readLock().lock();
try {
if (pagedInMessages.size() == size) {
// nothing new to check - mem constraint on page in
break;
}
size = pagedInMessages.size();
for (MessageReference ref : pagedInMessages) {
if (ref.getMessageId().equals(messageId)) {
message = (QueueMessageReference) ref;
break;
}
}
} finally {
pagedInMessagesLock.readLock().unlock();
}
} while (size < this.destinationStatistics.getMessages().getCount());

if (message == null) {
throw new JMSException("Slave broker out of sync with master - Message: "
Expand Down

0 comments on commit e42ba8d

Please sign in to comment.