Skip to content

Commit

Permalink
Remove external dependencies.
Browse files Browse the repository at this point in the history
  • Loading branch information
yanrongzhen committed Nov 3, 2023
1 parent 3d26f22 commit 34f6067
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.connector.spring.source;

import io.openmessaging.api.SendCallback;
import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;

/**
* Operations for sending messages.
Expand All @@ -26,6 +26,6 @@ public interface MessageSendingOperations {

void send(Object message);

void send(Object message, SendCallback sendCallback);
void send(Object message, SendMessageCallback sendCallback);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.eventmesh.connector.spring.source.MessageSendingOperations;
import org.apache.eventmesh.connector.spring.source.config.SpringSourceConfig;
import org.apache.eventmesh.openconnect.SourceWorker;
import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
Expand All @@ -34,8 +35,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import io.openmessaging.api.SendCallback;

import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand Down Expand Up @@ -127,7 +126,7 @@ public void send(Object message) {
* the SourceWorker will fetch message and invoke.
*/
@Override
public void send(Object message, SendCallback workerCallback) {
public void send(Object message, SendMessageCallback workerCallback) {
RecordPartition partition = new RecordPartition();
RecordOffset offset = new RecordOffset();
ConnectRecord record = new ConnectRecord(partition, offset, System.currentTimeMillis(), message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.SystemUtils;
import org.apache.eventmesh.openconnect.api.callback.SendExcepionContext;
import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
import org.apache.eventmesh.openconnect.api.callback.SendResult;
import org.apache.eventmesh.openconnect.api.config.SourceConfig;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
Expand Down Expand Up @@ -59,10 +62,6 @@

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.openmessaging.api.OnExceptionContext;
import io.openmessaging.api.SendCallback;
import io.openmessaging.api.SendResult;
import io.openmessaging.api.exception.OMSRuntimeException;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -188,8 +187,8 @@ public void startPollAndSend() {
// todo: convert connectRecord to cloudevent
CloudEvent event = convertRecordToEvent(connectRecord);
Optional<RecordOffsetManagement.SubmittedPosition> submittedRecordPosition = prepareToUpdateRecordOffset(connectRecord);
Optional<SendCallback> sendCallback = Optional.ofNullable(connectRecord.getExtensionObj(CALLBACK_EXTENSION))
.map(v -> (SendCallback) v);
Optional<SendMessageCallback> callback = Optional.ofNullable(connectRecord.getExtensionObj(CALLBACK_EXTENSION))
.map(v -> (SendMessageCallback) v);

int retryTimes = 0;
// retry until MAX_RETRY_TIMES is reached
Expand All @@ -201,15 +200,15 @@ public void startPollAndSend() {
// commit record
this.source.commit(connectRecord);
submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack);
sendCallback.ifPresent(cb -> cb.onSuccess(convertToSendResult(event)));
callback.ifPresent(cb -> cb.onSuccess(convertToSendResult(event)));
break;
}
throw new EventMeshException("failed to send record.");
} catch (Throwable t) {
retryTimes++;
log.error("{} failed to send record to {}, retry times = {}, failed record {}, throw {}",
this, event.getSubject(), retryTimes, connectRecord, t.getMessage());
sendCallback.ifPresent(cb -> cb.onException(convertToExceptionContext(event, t)));
callback.ifPresent(cb -> cb.onException(convertToExceptionContext(event, t)));
}
}

Expand Down Expand Up @@ -253,11 +252,11 @@ private SendResult convertToSendResult(CloudEvent event) {
return result;
}

private OnExceptionContext convertToExceptionContext(CloudEvent event, Throwable cause) {
OnExceptionContext exceptionContext = new OnExceptionContext();
private SendExcepionContext convertToExceptionContext(CloudEvent event, Throwable cause) {
SendExcepionContext exceptionContext = new SendExcepionContext();
exceptionContext.setTopic(event.getId());
exceptionContext.setMessageId(event.getId());
exceptionContext.setException(new OMSRuntimeException(cause));
exceptionContext.setException(cause);
return exceptionContext;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.openconnect.api.callback;

public class SendExcepionContext {

private String messageId;
private String topic;
private Throwable cause;

public SendExcepionContext() {
}

public String getMessageId() {
return this.messageId;
}

public void setMessageId(String messageId) {
this.messageId = messageId;
}

public String getTopic() {
return this.topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public Throwable getCause() {
return this.cause;
}

public void setException(Throwable cause) {
this.cause = cause;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.openconnect.api.callback;

/**
* Message sending callback interface.
*/
public interface SendMessageCallback {

void onSuccess(SendResult sendResult);

void onException(SendExcepionContext sendExcepionContext);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.openconnect.api.callback;

public class SendResult {

private String messageId;
private String topic;

public SendResult() {
}

public String getMessageId() {
return this.messageId;
}

public void setMessageId(String messageId) {
this.messageId = messageId;
}

public String getTopic() {
return this.topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public String toString() {
return "SendResult[topic=" + this.topic + ", messageId=" + this.messageId + ']';
}
}

0 comments on commit 34f6067

Please sign in to comment.