Skip to content

Commit

Permalink
Message expired failure when the destination doesn't exist
Browse files Browse the repository at this point in the history
Fix Formatting
  • Loading branch information
alekseie authored and Charlie-chenchrl committed Jan 30, 2024
1 parent 8ff284e commit 07bacc3
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.activemq.command.TransactionId;

public class ReplicaDestinationFilter extends DestinationFilter {

private final boolean nextIsComposite;
private final ReplicaSourceBroker sourceBroker;
private final ReplicaRoleManagementBroker roleManagementBroker;
Expand Down Expand Up @@ -55,6 +54,14 @@ public void send(ProducerBrokerExchange producerExchange, Message messageSend) t
}
}

@Override
public boolean canGC() {
if (ReplicaRole.source == roleManagementBroker.getRole()) {
return super.canGC();
}
return false;
}

private void replicateSend(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
final ConnectionContext connectionContext = producerExchange.getConnectionContext();
if (!sourceBroker.needToReplicateSend(connectionContext, messageSend)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;

import org.apache.activemq.util.IOHelper;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -940,6 +941,12 @@ public void canHandleEventOfType_MESSAGE_SEND_newerVersion() throws Exception {
verify(replicaEventMessage, never()).acknowledge();
}

private void verifyConnectionContext() {
verify(connectionContext, times(2)).isProducerFlowControl();
verify(connectionContext, times(2)).setProducerFlowControl(true);
verify(connectionContext, times(2)).setProducerFlowControl(false);
}


@Test
public void canHandleEventOfType_FAIL_OVER() throws Exception {
Expand Down Expand Up @@ -1000,10 +1007,4 @@ public byte[] getBranchQualifier() {
}
};
}

private void verifyConnectionContext() {
verify(connectionContext, times(2)).isProducerFlowControl();
verify(connectionContext, times(2)).setProducerFlowControl(true);
verify(connectionContext, times(2)).setProducerFlowControl(false);
}
}

0 comments on commit 07bacc3

Please sign in to comment.