diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java index 56c7bf32cf9..f544d09dbfa 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java @@ -54,6 +54,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; @@ -312,7 +313,13 @@ private void processMessage(ActiveMQMessage message, ReplicaEventType eventType, return; case REMOVE_DURABLE_CONSUMER: logger.trace("Processing replicated remove consumer"); - removeDurableConsumer((ConsumerInfo) deserializedData); + try { + removeDurableConsumer((ConsumerInfo) deserializedData, + message.getStringProperty(ReplicaSupport.CLIENT_ID_PROPERTY)); + } catch (JMSException e) { + logger.error("Failed to extract property to replicate remove consumer [{}]", deserializedData, e); + throw new Exception(e); + } return; case MESSAGE_EXPIRED: logger.trace("Processing replicated message expired"); @@ -529,15 +536,17 @@ private void addDurableConsumer(ConsumerInfo consumerInfo, String clientId) thro } } - private void removeDurableConsumer(ConsumerInfo consumerInfo) throws Exception { + private void removeDurableConsumer(ConsumerInfo consumerInfo, String clientId) throws Exception { try { ConnectionContext context = broker.getDestinations(consumerInfo.getDestination()).stream() .findFirst() .map(Destination::getConsumers) .stream().flatMap(Collection::stream) - .filter(v -> v.getConsumerInfo().getClientId().equals(consumerInfo.getClientId())) - .findFirst() + .filter(v -> v.getConsumerInfo().getSubscriptionName().equals(consumerInfo.getSubscriptionName())) .map(Subscription::getContext) + + .filter(v -> clientId == null || clientId.equals(v.getClientId())) + .findFirst() .orElse(null); if (context == null || !ReplicaSupport.REPLICATION_PLUGIN_USER_NAME.equals(context.getUserName())) { // a real consumer had stolen the context before we got the message diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSourceBroker.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSourceBroker.java index a7b3e74a310..f00b89ebc55 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSourceBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSourceBroker.java @@ -437,6 +437,8 @@ private void replicateRemoveConsumer(ConnectionContext context, ConsumerInfo con new ReplicaEvent() .setEventType(ReplicaEventType.REMOVE_DURABLE_CONSUMER) .setEventData(eventSerializer.serializeReplicationData(consumerInfo)) + .setReplicationProperty(ReplicaSupport.CLIENT_ID_PROPERTY, context.getClientId()) + .setVersion(2) ); } catch (Exception e) { logger.error("Failed to replicate adding {}", consumerInfo, e); diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSupport.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSupport.java index a109206e1f4..3b4f7750c79 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSupport.java @@ -33,7 +33,8 @@ private ReplicaSupport() { // Intentionally hidden } - public static final int CURRENT_VERSION = 1; + public static final int CURRENT_VERSION = 2; + public static final int DEFAULT_VERSION = 1; public static final int INTERMEDIATE_QUEUE_PREFETCH_SIZE = 10000; diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicationMessageProducer.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicationMessageProducer.java index c3bc487da8b..f064b443093 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicationMessageProducer.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicationMessageProducer.java @@ -67,7 +67,7 @@ private void enqueueReplicaEvent(ConnectionContext connectionContext, ReplicaEve eventMessage.setContent(event.getEventData()); eventMessage.setProperties(event.getReplicationProperties()); eventMessage.setTransactionId(event.getTransactionId()); - eventMessage.setIntProperty(ReplicaSupport.VERSION_PROPERTY, event.getVersion() == null ? ReplicaSupport.CURRENT_VERSION : event.getVersion()); + eventMessage.setIntProperty(ReplicaSupport.VERSION_PROPERTY, event.getVersion() == null ? ReplicaSupport.DEFAULT_VERSION : event.getVersion()); eventMessage.setTimestamp(event.getTimestamp() == null ? System.currentTimeMillis() : event.getTimestamp()); replicaInternalMessageProducer.sendForcingFlowControl(connectionContext, eventMessage); }