diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java index 4f69f55042..b7f075e6d3 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java @@ -55,4 +55,7 @@ public class SourceConnectorConfig { // extra config, e.g. GitHub secret private Map extraConfig = new HashMap<>(); + + // data consistency enabled, default true + private boolean dataConsistencyEnabled = true; } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java index c6cc90e0e0..4bc365a139 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java @@ -21,6 +21,8 @@ import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; import org.apache.eventmesh.connector.http.util.HttpUtils; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext; +import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import java.net.URI; @@ -111,14 +113,70 @@ public void handle(ConnectRecord record) { // convert ConnectRecord to HttpConnectRecord String type = String.format("%s.%s.%s", connectorConfig.getConnectorName(), url.getScheme(), "common"); HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type); - deliver(url, httpConnectRecord); + // get timestamp and offset + Long timestamp = httpConnectRecord.getData().getTimestamp(); + Map offset = null; + try { + // May throw NullPointerException. + offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap(); + } catch (NullPointerException e) { + // ignore null pointer exception + } + final Map finalOffset = offset; + Future> responseFuture = deliver(url, httpConnectRecord); + responseFuture.onSuccess(res -> { + log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, finalOffset); + // log the response + if (HttpUtils.is2xxSuccessful(res.statusCode())) { + if (log.isDebugEnabled()) { + log.debug("Received successful response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}", + res.statusCode(), timestamp, finalOffset, res.bodyAsString()); + } else { + log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, + finalOffset); + } + record.getCallback().onSuccess(convertToSendResult(record)); + } else { + if (log.isDebugEnabled()) { + log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}", + res.statusCode(), timestamp, finalOffset, res.bodyAsString()); + } else { + log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, + finalOffset); + } + record.getCallback() + .onException(buildSendExceptionContext(record, new RuntimeException("HTTP response code: " + res.statusCode()))); + } + }).onFailure(err -> { + log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, finalOffset, err); + record.getCallback().onException(buildSendExceptionContext(record, err)); + }); + } + } + + private SendResult convertToSendResult(ConnectRecord record) { + SendResult result = new SendResult(); + result.setMessageId(record.getRecordId()); + if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) { + result.setTopic(record.getExtension("topic")); + } + return result; + } + + private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Throwable e) { + SendExceptionContext sendExceptionContext = new SendExceptionContext(); + sendExceptionContext.setMessageId(record.getRecordId()); + sendExceptionContext.setCause(e); + if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) { + sendExceptionContext.setTopic(record.getExtension("topic")); } + return sendExceptionContext; } /** - * Processes HttpConnectRecord on specified URL while returning its own processing logic. - * This method sends the HttpConnectRecord to the specified URL using the WebClient. + * Processes HttpConnectRecord on specified URL while returning its own processing logic. This method sends the HttpConnectRecord to the specified + * URL using the WebClient. * * @param url URI to which the HttpConnectRecord should be sent * @param httpConnectRecord HttpConnectRecord to process @@ -130,48 +188,13 @@ public Future> deliver(URI url, HttpConnectRecord httpConne MultiMap headers = HttpHeaders.headers() .set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8") .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8"); - - // get timestamp and offset - Long timestamp = httpConnectRecord.getData().getTimestamp(); - Map offset = null; - try { - // May throw NullPointerException. - offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap(); - } catch (NullPointerException e) { - // ignore null pointer exception - } - final Map finalOffset = offset; - // send the request return this.webClient.post(url.getPath()) .host(url.getHost()) .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), "https") ? 443 : 80) : url.getPort()) .putHeaders(headers) .ssl(Objects.equals(url.getScheme(), "https")) - .sendJson(httpConnectRecord) - .onSuccess(res -> { - log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, finalOffset); - // log the response - if (HttpUtils.is2xxSuccessful(res.statusCode())) { - if (log.isDebugEnabled()) { - log.debug("Received successful response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}", - res.statusCode(), timestamp, finalOffset, res.bodyAsString()); - } else { - log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, - finalOffset); - } - } else { - if (log.isDebugEnabled()) { - log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}", - res.statusCode(), timestamp, finalOffset, res.bodyAsString()); - } else { - log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, - finalOffset); - } - } - - }) - .onFailure(err -> log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, finalOffset, err)); + .sendJson(httpConnectRecord); } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java index 1ca325b18d..2b2a01a9dd 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import io.netty.handler.codec.http.HttpResponseStatus; import io.vertx.core.Vertx; import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerOptions; @@ -41,6 +42,7 @@ import io.vertx.ext.web.Router; import io.vertx.ext.web.handler.LoggerHandler; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -52,22 +54,18 @@ public class HttpSourceConnector implements Source, ConnectorCreateService configClass() { @@ -106,7 +104,7 @@ private void doInit() { final Vertx vertx = Vertx.vertx(); final Router router = Router.router(vertx); - final Route route = router.route() + route = router.route() .path(this.sourceConfig.connectorConfig.getPath()) .handler(LoggerHandler.create()); @@ -136,7 +134,15 @@ public void start() { @Override public void commit(ConnectRecord record) { - + if (this.route != null && sourceConfig.getConnectorConfig().isDataConsistencyEnabled()) { + this.route.handler(ctx -> { + // Return 200 OK + ctx.response() + .putHeader("content-type", "application/json") + .setStatusCode(HttpResponseStatus.OK.code()) + .end("{\"status\":\"success\",\"recordId\":\"" + record.getRecordId() + "\"}"); + }); + } } @Override @@ -146,7 +152,15 @@ public String name() { @Override public void onException(ConnectRecord record) { - + if (this.route != null) { + this.route.failureHandler(ctx -> { + log.error("Failed to handle the request, recordId {}. ", record.getRecordId(), ctx.failure()); + // Return Bad Response + ctx.response() + .setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()) + .end("{\"status\":\"failed\",\"recordId\":\"" + record.getRecordId() + "\"}"); + }); + } } @Override diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java index 80e4f0a753..738f045237 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java @@ -45,6 +45,8 @@ public class CommonProtocol implements Protocol { public static final String PROTOCOL_NAME = "Common"; + private SourceConnectorConfig sourceConnectorConfig; + /** * Initialize the protocol * @@ -52,7 +54,7 @@ public class CommonProtocol implements Protocol { */ @Override public void initialize(SourceConnectorConfig sourceConnectorConfig) { - + this.sourceConnectorConfig = sourceConnectorConfig; } /** @@ -77,10 +79,13 @@ public void setHandler(Route route, SynchronizedCircularFifoQueue queue) throw new IllegalStateException("Failed to store the request."); } - // Return 200 OK - ctx.response() - .setStatusCode(HttpResponseStatus.OK.code()) - .end(CommonResponse.success().toJsonStr()); + if (!sourceConnectorConfig.isDataConsistencyEnabled()) { + // Return 200 OK + ctx.response() + .setStatusCode(HttpResponseStatus.OK.code()) + .end(CommonResponse.success().toJsonStr()); + } + }) .failureHandler(ctx -> { log.error("Failed to handle the request. ", ctx.failure());