Skip to content

Commit

Permalink
Implemented retry strategy based on pulsar message middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
HarshSawarkar committed Feb 12, 2024
1 parent 73ab1d6 commit f1a05cc
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 0 deletions.
12 changes: 12 additions & 0 deletions eventmesh-retry/eventmesh-retry-pulsar/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
List pulsar = ["org.apache.pulsar:pulsar-client:$pulsar_version"]

dependencies {
implementation project(":eventmesh-storage-plugin:eventmesh-storage-api")
implementation project(":eventmesh-storage-plugin:eventmesh-storage-pulsar")
implementation project(path: ':eventmesh-retry:eventmesh-retry-api')
implementation pulsar
implementation 'org.projectlombok:lombok:1.18.28'
implementation project(':eventmesh-common')
testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
}
18 changes: 18 additions & 0 deletions eventmesh-retry/eventmesh-retry-pulsar/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
pulsar_version=2.11.1
pluginType=retry
pluginName=pulsar
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.retry.pulsar;

import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.api.producer.Producer;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey.ClientInstanceKey;
import org.apache.eventmesh.retry.api.conf.RetryConfiguration;
import org.apache.eventmesh.retry.api.strategy.RetryStrategy;

import org.apache.pulsar.client.util.RetryMessageUtil;

import java.util.Objects;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class PulsarRetryStrategyImpl implements RetryStrategy {
@Override
public void retry(final RetryConfiguration configuration) {
sendMessageBack(configuration);
}

@SneakyThrows
private void sendMessageBack(final RetryConfiguration configuration) {
CloudEvent event = configuration.getEvent();
String topic = configuration.getTopic();
String consumerGroupName = configuration.getConsumerGroupName();
String retryTopicName = consumerGroupName + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;

String bizSeqNo = Objects.requireNonNull(event.getExtension(ClientInstanceKey.BIZSEQNO.getKey())).toString();
String uniqueId = Objects.requireNonNull(event.getExtension(ClientInstanceKey.UNIQUEID.getKey())).toString();
CloudEvent retryEvent = CloudEventBuilder.from(event)
.withExtension(ProtocolKey.TOPIC, topic)
.withSubject(retryTopicName)
.build();

Producer producer = configuration.getProducer();

producer.publish(retryEvent, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
log.info("consumer:{} consume success,, bizSeqno:{}, uniqueId:{}",
consumerGroupName, bizSeqNo, uniqueId);
}

@Override
public void onException(final OnExceptionContext context) {
log.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}",
consumerGroupName, bizSeqNo, uniqueId, context.getException());
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

pulsar=org.apache.eventmesh.retry.pulsar.PulsarRetryStrategyImpl
3 changes: 3 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,6 @@ include 'eventmesh-webhook:eventmesh-webhook-receive'
include 'eventmesh-retry'
include 'eventmesh-retry:eventmesh-retry-api'
include 'eventmesh-retry:eventmesh-retry-rocketmq'
include 'eventmesh-retry:eventmesh-retry-pulsar'
findProject(':eventmesh-retry:eventmesh-retry-pulsar')?.name = 'eventmesh-retry-pulsar'

0 comments on commit f1a05cc

Please sign in to comment.