Skip to content

Commit

Permalink
[ISSUE #5069] Enhancement for http source/sink connector (#5070)
Browse files Browse the repository at this point in the history
* [ISSUE #5069] Enhancement for http source/sink connector

* update http source connector & config

* fix checkstyle error
  • Loading branch information
xwm1992 authored Aug 7, 2024
1 parent 771a189 commit a6018dd
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,7 @@ public class SourceConnectorConfig {

// extra config, e.g. GitHub secret
private Map<String, String> extraConfig = new HashMap<>();

// data consistency enabled, default true
private boolean dataConsistencyEnabled = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
import org.apache.eventmesh.connector.http.util.HttpUtils;
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

import java.net.URI;
Expand Down Expand Up @@ -111,14 +113,70 @@ public void handle(ConnectRecord record) {
// convert ConnectRecord to HttpConnectRecord
String type = String.format("%s.%s.%s", connectorConfig.getConnectorName(), url.getScheme(), "common");
HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);
deliver(url, httpConnectRecord);
// get timestamp and offset
Long timestamp = httpConnectRecord.getData().getTimestamp();
Map<String, ?> offset = null;
try {
// May throw NullPointerException.
offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
} catch (NullPointerException e) {
// ignore null pointer exception
}
final Map<String, ?> finalOffset = offset;
Future<HttpResponse<Buffer>> responseFuture = deliver(url, httpConnectRecord);
responseFuture.onSuccess(res -> {
log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, finalOffset);
// log the response
if (HttpUtils.is2xxSuccessful(res.statusCode())) {
if (log.isDebugEnabled()) {
log.debug("Received successful response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
res.statusCode(), timestamp, finalOffset, res.bodyAsString());
} else {
log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
finalOffset);
}
record.getCallback().onSuccess(convertToSendResult(record));
} else {
if (log.isDebugEnabled()) {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
res.statusCode(), timestamp, finalOffset, res.bodyAsString());
} else {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
finalOffset);
}
record.getCallback()
.onException(buildSendExceptionContext(record, new RuntimeException("HTTP response code: " + res.statusCode())));
}
}).onFailure(err -> {
log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, finalOffset, err);
record.getCallback().onException(buildSendExceptionContext(record, err));
});
}
}

private SendResult convertToSendResult(ConnectRecord record) {
SendResult result = new SendResult();
result.setMessageId(record.getRecordId());
if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
result.setTopic(record.getExtension("topic"));
}
return result;
}

private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Throwable e) {
SendExceptionContext sendExceptionContext = new SendExceptionContext();
sendExceptionContext.setMessageId(record.getRecordId());
sendExceptionContext.setCause(e);
if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
sendExceptionContext.setTopic(record.getExtension("topic"));
}
return sendExceptionContext;
}


/**
* Processes HttpConnectRecord on specified URL while returning its own processing logic.
* This method sends the HttpConnectRecord to the specified URL using the WebClient.
* Processes HttpConnectRecord on specified URL while returning its own processing logic. This method sends the HttpConnectRecord to the specified
* URL using the WebClient.
*
* @param url URI to which the HttpConnectRecord should be sent
* @param httpConnectRecord HttpConnectRecord to process
Expand All @@ -130,48 +188,13 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
MultiMap headers = HttpHeaders.headers()
.set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8")
.set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");

// get timestamp and offset
Long timestamp = httpConnectRecord.getData().getTimestamp();
Map<String, ?> offset = null;
try {
// May throw NullPointerException.
offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
} catch (NullPointerException e) {
// ignore null pointer exception
}
final Map<String, ?> finalOffset = offset;

// send the request
return this.webClient.post(url.getPath())
.host(url.getHost())
.port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), "https") ? 443 : 80) : url.getPort())
.putHeaders(headers)
.ssl(Objects.equals(url.getScheme(), "https"))
.sendJson(httpConnectRecord)
.onSuccess(res -> {
log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, finalOffset);
// log the response
if (HttpUtils.is2xxSuccessful(res.statusCode())) {
if (log.isDebugEnabled()) {
log.debug("Received successful response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
res.statusCode(), timestamp, finalOffset, res.bodyAsString());
} else {
log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
finalOffset);
}
} else {
if (log.isDebugEnabled()) {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
res.statusCode(), timestamp, finalOffset, res.bodyAsString());
} else {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
finalOffset);
}
}

})
.onFailure(err -> log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, finalOffset, err));
.sendJson(httpConnectRecord);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.LoggerHandler;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand All @@ -52,22 +54,18 @@ public class HttpSourceConnector implements Source, ConnectorCreateService<Sourc

private int batchSize;

private Route route;

private Protocol protocol;

private HttpServer server;

@Getter
private volatile boolean started = false;

@Getter
private volatile boolean destroyed = false;

public boolean isStarted() {
return started;
}

public boolean isDestroyed() {
return destroyed;
}


@Override
public Class<? extends Config> configClass() {
Expand Down Expand Up @@ -106,7 +104,7 @@ private void doInit() {

final Vertx vertx = Vertx.vertx();
final Router router = Router.router(vertx);
final Route route = router.route()
route = router.route()
.path(this.sourceConfig.connectorConfig.getPath())
.handler(LoggerHandler.create());

Expand Down Expand Up @@ -136,7 +134,15 @@ public void start() {

@Override
public void commit(ConnectRecord record) {

if (this.route != null && sourceConfig.getConnectorConfig().isDataConsistencyEnabled()) {
this.route.handler(ctx -> {
// Return 200 OK
ctx.response()
.putHeader("content-type", "application/json")
.setStatusCode(HttpResponseStatus.OK.code())
.end("{\"status\":\"success\",\"recordId\":\"" + record.getRecordId() + "\"}");
});
}
}

@Override
Expand All @@ -146,7 +152,15 @@ public String name() {

@Override
public void onException(ConnectRecord record) {

if (this.route != null) {
this.route.failureHandler(ctx -> {
log.error("Failed to handle the request, recordId {}. ", record.getRecordId(), ctx.failure());
// Return Bad Response
ctx.response()
.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())
.end("{\"status\":\"failed\",\"recordId\":\"" + record.getRecordId() + "\"}");
});
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@ public class CommonProtocol implements Protocol {

public static final String PROTOCOL_NAME = "Common";

private SourceConnectorConfig sourceConnectorConfig;

/**
* Initialize the protocol
*
* @param sourceConnectorConfig source connector config
*/
@Override
public void initialize(SourceConnectorConfig sourceConnectorConfig) {

this.sourceConnectorConfig = sourceConnectorConfig;
}

/**
Expand All @@ -77,10 +79,13 @@ public void setHandler(Route route, SynchronizedCircularFifoQueue<Object> queue)
throw new IllegalStateException("Failed to store the request.");
}

// Return 200 OK
ctx.response()
.setStatusCode(HttpResponseStatus.OK.code())
.end(CommonResponse.success().toJsonStr());
if (!sourceConnectorConfig.isDataConsistencyEnabled()) {
// Return 200 OK
ctx.response()
.setStatusCode(HttpResponseStatus.OK.code())
.end(CommonResponse.success().toJsonStr());
}

})
.failureHandler(ctx -> {
log.error("Failed to handle the request. ", ctx.failure());
Expand Down

0 comments on commit a6018dd

Please sign in to comment.