Skip to content

Commit

Permalink
[AMQ-8354] Add logic to load more messages if we can't find last mess…
Browse files Browse the repository at this point in the history
…age for recovery.

Add check if the broker is stopping to stop the task runners faster.
  • Loading branch information
NikitaShupletsov committed Jun 14, 2024
1 parent b77dba7 commit bbae1e9
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.MessageReferenceFilter;
import org.apache.activemq.broker.region.PrefetchSubscription;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
Expand All @@ -33,6 +35,7 @@
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.TransactionId;
Expand All @@ -46,7 +49,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import javax.jms.JMSException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -166,7 +169,7 @@ void initialize() throws Exception {
intermediateQueue.iterate();
String savedSequences = sequenceStorage.initialize(subscriptionConnectionContext);
List<String> savedSequencesToRestore = restoreSequenceStorage.initialize(subscriptionConnectionContext);
restoreSequence(savedSequences, savedSequencesToRestore);
restoreSequence(intermediateQueue, savedSequences, savedSequencesToRestore);

scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this::asyncSendWakeup,
Expand Down Expand Up @@ -217,7 +220,7 @@ void deinitialize() throws Exception {

}

void restoreSequence(String savedSequence, List<String> savedSequencesToRestore) throws Exception {
void restoreSequence(Queue intermediateQueue, String savedSequence, List<String> savedSequencesToRestore) throws Exception {
if (savedSequence != null) {
String[] split = savedSequence.split("#");
if (split.length != 2) {
Expand Down Expand Up @@ -246,16 +249,37 @@ void restoreSequence(String savedSequence, List<String> savedSequencesToRestore)
break;
}
}

ConnectionContext connectionContext = createConnectionContext();

if (!found) {
throw new IllegalStateException("Can't recover sequence. Message with id: " + recoveryMessageId + " not found");
Set<String> matchingIds = matchingMessages.stream()
.map(MessageReference::getMessageId)
.map(MessageId::toString)
.collect(Collectors.toSet());

List<QueueMessageReference> extraMessages = intermediateQueue.getMessagesUntilMatches(connectionContext,
(context, mr) -> mr.getMessageId().equals(recoveryMessageId));
if (extraMessages == null) {
throw new IllegalStateException("Can't recover sequence. Message with id: " + recoveryMessageId + " not found");
}

List<MessageReference> toDispatch = new ArrayList<>();
for (MessageReference mr : extraMessages) {
if (matchingIds.contains(mr.getMessageId().toString())) {
continue;
}
matchingMessages.add(mr);
toDispatch.add(mr);
}
intermediateQueue.dispatchNotification(subscription, toDispatch);
}

TransactionId transactionId = new LocalTransactionId(
new ConnectionId(ReplicaSupport.REPLICATION_PLUGIN_CONNECTION_ID),
ReplicaSupport.LOCAL_TRANSACTION_ID_GENERATOR.getNextSequenceId());
boolean rollbackOnFail = false;

ConnectionContext connectionContext = createConnectionContext();
BigInteger sequence = null;
try {
broker.beginTransaction(connectionContext, transactionId);
Expand Down Expand Up @@ -406,7 +430,7 @@ boolean iterateAck() {
}
}

return pendingAckWakeups.get() > 0;
return !broker.getBrokerService().isStopping() && pendingAckWakeups.get() > 0;
}

private void iterateAck0() {
Expand Down Expand Up @@ -499,7 +523,7 @@ boolean iterateSend() {
}
}

return pendingSendTriggeredWakeups.get() > 0;
return !broker.getBrokerService().isStopping() && pendingSendTriggeredWakeups.get() > 0;
}

private void iterateSend0() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void setUp() throws Exception {
public void restoreSequenceWhenNoSequence() throws Exception {
sequencer.sequence = null;

sequencer.restoreSequence(null, Collections.emptyList());
sequencer.restoreSequence(intermediateQueue, null, Collections.emptyList());

assertThat(sequencer.sequence).isNull();
}
Expand All @@ -135,7 +135,7 @@ public void restoreSequenceWhenSequenceExistsButNoRecoverySequences() throws Exc
sequencer.sequence = null;

MessageId messageId = new MessageId("1:0:0:1");
sequencer.restoreSequence("1#" + messageId, Collections.emptyList());
sequencer.restoreSequence(intermediateQueue, "1#" + messageId, Collections.emptyList());
verify(replicationMessageProducer, never()).enqueueMainReplicaEvent(any(), any(ReplicaEvent.class));

assertThat(sequencer.sequence).isEqualTo(1);
Expand Down Expand Up @@ -167,7 +167,7 @@ public void restoreSequenceWhenStorageExistAndMessageDoesNotExist() throws Excep

when(intermediateSubscription.getDispatched()).thenReturn(new ArrayList<>(List.of(message1, message2, message3, message4)));

sequencer.restoreSequence("4#" + messageId4, List.of("1#" + messageId1 + "#" + messageId2, "3#" + messageId3 + "#" + messageId4));
sequencer.restoreSequence(intermediateQueue, "4#" + messageId4, List.of("1#" + messageId1 + "#" + messageId2, "3#" + messageId3 + "#" + messageId4));

assertThat(sequencer.sequence).isEqualTo(4);

Expand Down

0 comments on commit bbae1e9

Please sign in to comment.