diff --git a/eventmesh-retry/eventmesh-retry-pulsar/src/main/java/org/apache/eventmesh/retry/pulsar/PulsarRetryStrategyImpl.java b/eventmesh-retry/eventmesh-retry-pulsar/src/main/java/org/apache/eventmesh/retry/pulsar/PulsarRetryStrategyImpl.java index ed51602488..76834a816c 100644 --- a/eventmesh-retry/eventmesh-retry-pulsar/src/main/java/org/apache/eventmesh/retry/pulsar/PulsarRetryStrategyImpl.java +++ b/eventmesh-retry/eventmesh-retry-pulsar/src/main/java/org/apache/eventmesh/retry/pulsar/PulsarRetryStrategyImpl.java @@ -47,9 +47,9 @@ public void retry(final RetryConfiguration configuration) { private void sendMessageBack(final RetryConfiguration configuration) { CloudEvent event = configuration.getEvent(); String topic = configuration.getTopic(); + String subscriptionMode = configuration.getSubscriptionMode().getMode(); + String retryTopicName = topic + "-" + subscriptionMode + "-" + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; String consumerGroupName = configuration.getConsumerGroupName(); - String retryTopicName = consumerGroupName + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; - String bizSeqNo = Objects.requireNonNull(event.getExtension(ClientInstanceKey.BIZSEQNO.getKey())).toString(); String uniqueId = Objects.requireNonNull(event.getExtension(ClientInstanceKey.UNIQUEID.getKey())).toString(); CloudEvent retryEvent = CloudEventBuilder.from(event) diff --git a/eventmesh-storage-plugin/eventmesh-storage-pulsar/src/main/java/org/apache/eventmesh/storage/pulsar/consumer/PulsarConsumerImpl.java b/eventmesh-storage-plugin/eventmesh-storage-pulsar/src/main/java/org/apache/eventmesh/storage/pulsar/consumer/PulsarConsumerImpl.java index 6dbcd8d68c..255891edbc 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-pulsar/src/main/java/org/apache/eventmesh/storage/pulsar/consumer/PulsarConsumerImpl.java +++ b/eventmesh-storage-plugin/eventmesh-storage-pulsar/src/main/java/org/apache/eventmesh/storage/pulsar/consumer/PulsarConsumerImpl.java @@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -41,6 +42,7 @@ import java.util.Objects; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import io.cloudevents.CloudEvent; @@ -122,8 +124,19 @@ public void commit(EventMeshAction action) { String consumerKey = topic + PulsarConstant.KEY_SEPARATOR + properties.getProperty(Constants.CONSUMER_GROUP) + PulsarConstant.KEY_SEPARATOR + properties.getProperty(Constants.CLIENT_ADDRESS); + + String dlqTopic = subTopic + "-DLQ"; + + String retryTopic = subTopic + "-RETRY"; + org.apache.pulsar.client.api.Consumer consumer = pulsarClient.newConsumer() .topic(subTopic) + .enableRetry(true) + .deadLetterPolicy(DeadLetterPolicy.builder() + .deadLetterTopic(dlqTopic) + .retryLetterTopic(retryTopic) + .maxRedeliverCount(3) + .build()) .subscriptionName(properties.getProperty(Constants.CONSUMER_GROUP)) .subscriptionMode(SubscriptionMode.Durable) .subscriptionType(type) @@ -140,6 +153,13 @@ public void commit(EventMeshAction action) { String.format("Failed to unsubscribe the topic:%s with exception: %s", subTopic, ex.getMessage())); } catch (EventDeserializationException ex) { log.warn("The Message isn't json format, with exception:{}", ex.getMessage()); + } catch (Exception e) { + ackConsumer.negativeAcknowledge(msg); + try { + ackConsumer.reconsumeLater(msg, 5, TimeUnit.SECONDS); + } catch (PulsarClientException ex) { + throw new RuntimeException(ex); + } } }) .subscribe();