Skip to content

Commit

Permalink
[AMQ-8354] Fix replication of durable subscribers.
Browse files Browse the repository at this point in the history
  • Loading branch information
NikitaShupletsov authored and kenliao94 committed May 14, 2024
1 parent 0faa9d6 commit c626f6f
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -312,7 +313,13 @@ private void processMessage(ActiveMQMessage message, ReplicaEventType eventType,
return;
case REMOVE_DURABLE_CONSUMER:
logger.trace("Processing replicated remove consumer");
removeDurableConsumer((ConsumerInfo) deserializedData);
try {
removeDurableConsumer((ConsumerInfo) deserializedData,
message.getStringProperty(ReplicaSupport.CLIENT_ID_PROPERTY));
} catch (JMSException e) {
logger.error("Failed to extract property to replicate remove consumer [{}]", deserializedData, e);
throw new Exception(e);
}
return;
case MESSAGE_EXPIRED:
logger.trace("Processing replicated message expired");
Expand Down Expand Up @@ -529,15 +536,17 @@ private void addDurableConsumer(ConsumerInfo consumerInfo, String clientId) thro
}
}

private void removeDurableConsumer(ConsumerInfo consumerInfo) throws Exception {
private void removeDurableConsumer(ConsumerInfo consumerInfo, String clientId) throws Exception {
try {
ConnectionContext context = broker.getDestinations(consumerInfo.getDestination()).stream()
.findFirst()
.map(Destination::getConsumers)
.stream().flatMap(Collection::stream)
.filter(v -> v.getConsumerInfo().getClientId().equals(consumerInfo.getClientId()))
.findFirst()
.filter(v -> v.getConsumerInfo().getSubscriptionName().equals(consumerInfo.getSubscriptionName()))
.map(Subscription::getContext)

.filter(v -> clientId == null || clientId.equals(v.getClientId()))
.findFirst()
.orElse(null);
if (context == null || !ReplicaSupport.REPLICATION_PLUGIN_USER_NAME.equals(context.getUserName())) {
// a real consumer had stolen the context before we got the message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@ private void replicateRemoveConsumer(ConnectionContext context, ConsumerInfo con
new ReplicaEvent()
.setEventType(ReplicaEventType.REMOVE_DURABLE_CONSUMER)
.setEventData(eventSerializer.serializeReplicationData(consumerInfo))
.setReplicationProperty(ReplicaSupport.CLIENT_ID_PROPERTY, context.getClientId())
.setVersion(2)
);
} catch (Exception e) {
logger.error("Failed to replicate adding {}", consumerInfo, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ private ReplicaSupport() {
// Intentionally hidden
}

public static final int CURRENT_VERSION = 1;
public static final int CURRENT_VERSION = 2;
public static final int DEFAULT_VERSION = 1;

public static final int INTERMEDIATE_QUEUE_PREFETCH_SIZE = 10000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private void enqueueReplicaEvent(ConnectionContext connectionContext, ReplicaEve
eventMessage.setContent(event.getEventData());
eventMessage.setProperties(event.getReplicationProperties());
eventMessage.setTransactionId(event.getTransactionId());
eventMessage.setIntProperty(ReplicaSupport.VERSION_PROPERTY, event.getVersion() == null ? ReplicaSupport.CURRENT_VERSION : event.getVersion());
eventMessage.setIntProperty(ReplicaSupport.VERSION_PROPERTY, event.getVersion() == null ? ReplicaSupport.DEFAULT_VERSION : event.getVersion());
eventMessage.setTimestamp(event.getTimestamp() == null ? System.currentTimeMillis() : event.getTimestamp());
replicaInternalMessageProducer.sendForcingFlowControl(connectionContext, eventMessage);
}
Expand Down

0 comments on commit c626f6f

Please sign in to comment.