diff --git a/eventmesh-retry/eventmesh-retry-pulsar/build.gradle b/eventmesh-retry/eventmesh-retry-pulsar/build.gradle new file mode 100644 index 0000000000..fdd22f1bd1 --- /dev/null +++ b/eventmesh-retry/eventmesh-retry-pulsar/build.gradle @@ -0,0 +1,12 @@ +List pulsar = ["org.apache.pulsar:pulsar-client:$pulsar_version"] + +dependencies { + implementation project(":eventmesh-storage-plugin:eventmesh-storage-api") + implementation project(":eventmesh-storage-plugin:eventmesh-storage-pulsar") + implementation project(path: ':eventmesh-retry:eventmesh-retry-api') + implementation pulsar + implementation 'org.projectlombok:lombok:1.18.28' + implementation project(':eventmesh-common') + testImplementation platform('org.junit:junit-bom:5.9.1') + testImplementation 'org.junit.jupiter:junit-jupiter' +} diff --git a/eventmesh-retry/eventmesh-retry-pulsar/gradle.properties b/eventmesh-retry/eventmesh-retry-pulsar/gradle.properties new file mode 100644 index 0000000000..f34fea7244 --- /dev/null +++ b/eventmesh-retry/eventmesh-retry-pulsar/gradle.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +pulsar_version=2.11.1 +pluginType=retry +pluginName=pulsar \ No newline at end of file diff --git a/eventmesh-retry/eventmesh-retry-pulsar/src/main/java/org/apache/eventmesh/retry/pulsar/PulsarRetryStrategyImpl.java b/eventmesh-retry/eventmesh-retry-pulsar/src/main/java/org/apache/eventmesh/retry/pulsar/PulsarRetryStrategyImpl.java new file mode 100644 index 0000000000..ed51602488 --- /dev/null +++ b/eventmesh-retry/eventmesh-retry-pulsar/src/main/java/org/apache/eventmesh/retry/pulsar/PulsarRetryStrategyImpl.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.retry.pulsar; + +import org.apache.eventmesh.api.SendCallback; +import org.apache.eventmesh.api.SendResult; +import org.apache.eventmesh.api.exception.OnExceptionContext; +import org.apache.eventmesh.api.producer.Producer; +import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; +import org.apache.eventmesh.common.protocol.http.common.ProtocolKey.ClientInstanceKey; +import org.apache.eventmesh.retry.api.conf.RetryConfiguration; +import org.apache.eventmesh.retry.api.strategy.RetryStrategy; + +import org.apache.pulsar.client.util.RetryMessageUtil; + +import java.util.Objects; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class PulsarRetryStrategyImpl implements RetryStrategy { + @Override + public void retry(final RetryConfiguration configuration) { + sendMessageBack(configuration); + } + + @SneakyThrows + private void sendMessageBack(final RetryConfiguration configuration) { + CloudEvent event = configuration.getEvent(); + String topic = configuration.getTopic(); + String consumerGroupName = configuration.getConsumerGroupName(); + String retryTopicName = consumerGroupName + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; + + String bizSeqNo = Objects.requireNonNull(event.getExtension(ClientInstanceKey.BIZSEQNO.getKey())).toString(); + String uniqueId = Objects.requireNonNull(event.getExtension(ClientInstanceKey.UNIQUEID.getKey())).toString(); + CloudEvent retryEvent = CloudEventBuilder.from(event) + .withExtension(ProtocolKey.TOPIC, topic) + .withSubject(retryTopicName) + .build(); + + Producer producer = configuration.getProducer(); + + producer.publish(retryEvent, new SendCallback() { + @Override + public void onSuccess(final SendResult sendResult) { + log.info("consumer:{} consume success,, bizSeqno:{}, uniqueId:{}", + consumerGroupName, bizSeqNo, uniqueId); + } + + @Override + public void onException(final OnExceptionContext context) { + log.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}", + consumerGroupName, bizSeqNo, uniqueId, context.getException()); + } + }); + } +} diff --git a/eventmesh-retry/eventmesh-retry-pulsar/src/main/resources/META-INF.eventmesh/org.apache.eventmesh.retry.api.strategy.RetryStrategy b/eventmesh-retry/eventmesh-retry-pulsar/src/main/resources/META-INF.eventmesh/org.apache.eventmesh.retry.api.strategy.RetryStrategy new file mode 100644 index 0000000000..5273bcd4e1 --- /dev/null +++ b/eventmesh-retry/eventmesh-retry-pulsar/src/main/resources/META-INF.eventmesh/org.apache.eventmesh.retry.api.strategy.RetryStrategy @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +pulsar=org.apache.eventmesh.retry.pulsar.PulsarRetryStrategyImpl \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 645e6fb365..d602c0b9fa 100644 --- a/settings.gradle +++ b/settings.gradle @@ -126,3 +126,6 @@ include 'eventmesh-webhook:eventmesh-webhook-receive' include 'eventmesh-retry' include 'eventmesh-retry:eventmesh-retry-api' include 'eventmesh-retry:eventmesh-retry-rocketmq' +include 'eventmesh-retry:eventmesh-retry-pulsar' +findProject(':eventmesh-retry:eventmesh-retry-pulsar')?.name = 'eventmesh-retry-pulsar' +