Skip to content

Commit

Permalink
Implemented retry strategy based on pulsar message middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
HarshSawarkar committed May 18, 2024
1 parent 9db6598 commit 62ba0e8
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public void retry(final RetryConfiguration configuration) {
private void sendMessageBack(final RetryConfiguration configuration) {
CloudEvent event = configuration.getEvent();
String topic = configuration.getTopic();
String subscriptionMode = configuration.getSubscriptionMode().getMode();
String retryTopicName = topic + "-" + subscriptionMode + "-" + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
String consumerGroupName = configuration.getConsumerGroupName();
String retryTopicName = consumerGroupName + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;

String bizSeqNo = Objects.requireNonNull(event.getExtension(ClientInstanceKey.BIZSEQNO.getKey())).toString();
String uniqueId = Objects.requireNonNull(event.getExtension(ClientInstanceKey.UNIQUEID.getKey())).toString();
CloudEvent retryEvent = CloudEventBuilder.from(event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
Expand All @@ -41,6 +42,7 @@
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import io.cloudevents.CloudEvent;
Expand Down Expand Up @@ -122,8 +124,19 @@ public void commit(EventMeshAction action) {

String consumerKey = topic + PulsarConstant.KEY_SEPARATOR + properties.getProperty(Constants.CONSUMER_GROUP)
+ PulsarConstant.KEY_SEPARATOR + properties.getProperty(Constants.CLIENT_ADDRESS);

String dlqTopic = subTopic + "-DLQ";

String retryTopic = subTopic + "-RETRY";

org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(subTopic)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder()
.deadLetterTopic(dlqTopic)
.retryLetterTopic(retryTopic)
.maxRedeliverCount(3)
.build())
.subscriptionName(properties.getProperty(Constants.CONSUMER_GROUP))
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionType(type)
Expand All @@ -140,6 +153,13 @@ public void commit(EventMeshAction action) {
String.format("Failed to unsubscribe the topic:%s with exception: %s", subTopic, ex.getMessage()));
} catch (EventDeserializationException ex) {
log.warn("The Message isn't json format, with exception:{}", ex.getMessage());
} catch (Exception e) {
ackConsumer.negativeAcknowledge(msg);
try {
ackConsumer.reconsumeLater(msg, 5, TimeUnit.SECONDS);
} catch (PulsarClientException ex) {
throw new RuntimeException(ex);
}
}
})
.subscribe();
Expand Down

0 comments on commit 62ba0e8

Please sign in to comment.