Skip to content

Commit

Permalink
Fix slow ack replication.
Browse files Browse the repository at this point in the history
  • Loading branch information
NikitaShupletsov committed Jun 27, 2023
1 parent 600f8b6 commit b33dfbc
Showing 1 changed file with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -2504,26 +2505,26 @@ private QueueMessageReference getMatchingMessage(MessageDispatchNotification mes
QueueMessageReference message = null;
MessageId messageId = messageDispatchNotification.getMessageId();

Set<MessageReference> set = new LinkedHashSet<MessageReference>();
int size = 0;
do {
doPageIn(true, false, getMaxPageSize());
pagedInMessagesLock.readLock().lock();
try {
if (!set.addAll(pagedInMessages.values())) {
if (pagedInMessages.size() == size) {
// nothing new to check - mem constraint on page in
break;
};
}
size = pagedInMessages.size();
for (MessageReference ref : pagedInMessages) {
if (ref.getMessageId().equals(messageId)) {
message = (QueueMessageReference) ref;
break;
}
}
} finally {
pagedInMessagesLock.readLock().unlock();
}
List<MessageReference> list = new ArrayList<MessageReference>(set);
for (MessageReference ref : list) {
if (ref.getMessageId().equals(messageId)) {
message = (QueueMessageReference) ref;
break;
}
}
} while (set.size() < this.destinationStatistics.getMessages().getCount());
} while (size < this.destinationStatistics.getMessages().getCount());

if (message == null) {
throw new JMSException("Slave broker out of sync with master - Message: "
Expand Down

0 comments on commit b33dfbc

Please sign in to comment.