From 771a18940498f6dc55f25564f7cf83cd4618204b Mon Sep 17 00:00:00 2001
From: mike_xwm
Date: Sun, 4 Aug 2024 22:16:19 +0800
Subject: [PATCH 01/51] [ISSUE #5067] Enhancement for eventmesh-admin-server
(#5068)
* [ISSUE #5040] Support gtid mode for sync data with mysql
* fix conflicts with master
* fix checkstyle error
* [ISSUE #5044] Data synchronization strong verification in mariadb gtid mode
* fix checkstyle error
* [ISSUE #5048] Add report verify request to admin for connector runtime
* fix checkstyle error
* [ISSUE #5052] Enhancement for source\sink connector
* fix checkstyle error
* fix checkstyle error
* [ISSUE #5067] Enhancement for eventmesh-admin-server
---
eventmesh-admin-server/conf/application.yaml | 10 ++-
eventmesh-admin-server/conf/eventmesh.sql | 14 +--
.../conf/mapper/EventMeshJobInfoMapper.xml | 40 ++++-----
.../conf/mapper/EventMeshTaskInfoMapper.xml | 13 +--
.../admin/server/AdminServerProperties.java | 5 ++
.../admin/server/web/HttpServer.java | 9 +-
.../web/db/entity/EventMeshJobInfo.java | 8 +-
.../web/db/entity/EventMeshTaskInfo.java | 10 ++-
.../db/mapper/EventMeshJobInfoExtMapper.java | 18 +++-
.../impl/EventMeshVerifyServiceImpl.java | 39 +++++++++
.../admin/server/web/pojo/JobDetail.java | 8 +-
.../web/service/job/JobInfoBizService.java | 30 ++++---
.../web/service/task/TaskBizService.java | 55 +++++++++---
.../common/remote/TransportType.java | 6 +-
.../common/remote/datasource/DataSource.java | 25 +++---
.../MySqlIncDataSourceSourceConf.java | 85 -------------------
.../request/CreateOrUpdateDataSourceReq.java | 5 +-
.../remote/request/CreateTaskRequest.java | 33 ++++++-
.../runtime/RuntimeInstanceConfig.java | 4 +-
.../runtime/boot/RuntimeInstance.java | 44 +++++-----
.../runtime/connector/ConnectorRuntime.java | 2 +-
.../src/main/resources/runtime.yaml | 2 +
22 files changed, 272 insertions(+), 193 deletions(-)
create mode 100644 eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshVerifyServiceImpl.java
delete mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/MySqlIncDataSourceSourceConf.java
diff --git a/eventmesh-admin-server/conf/application.yaml b/eventmesh-admin-server/conf/application.yaml
index 54795057cb..afbcd4a438 100644
--- a/eventmesh-admin-server/conf/application.yaml
+++ b/eventmesh-admin-server/conf/application.yaml
@@ -28,5 +28,11 @@ mybatis-plus:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
event-mesh:
admin-server:
- service-name: DEFAULT_GROUP@@em_adm_server
- port: 8081
\ No newline at end of file
+ serviceName: DEFAULT_GROUP@@em_adm_server
+ port: 8081
+ adminServerList:
+ region1:
+ - http://localhost:8081
+ region2:
+ - http://localhost:8082
+ region: region1
\ No newline at end of file
diff --git a/eventmesh-admin-server/conf/eventmesh.sql b/eventmesh-admin-server/conf/eventmesh.sql
index 82d5c53317..94edbb6fac 100644
--- a/eventmesh-admin-server/conf/eventmesh.sql
+++ b/eventmesh-admin-server/conf/eventmesh.sql
@@ -45,14 +45,15 @@ CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`jobID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
- `desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
+ `jobDesc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`transportType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`sourceData` int NOT NULL DEFAULT '0',
`targetData` int NOT NULL DEFAULT '0',
- `state` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
+ `jobState` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`jobType` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`fromRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
+ `runningRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createUid` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`updateUid` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
@@ -118,10 +119,11 @@ CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` (
CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
- `name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
- `desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
- `state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'taskstate',
- `fromRegion` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
+ `taskName` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
+ `taskDesc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
+ `taskState` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'taskstate',
+ `sourceRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
+ `targetRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
diff --git a/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml
index 02e8806680..a053d1c838 100644
--- a/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml
+++ b/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml
@@ -19,31 +19,33 @@
-->
+ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+ "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
- id,jobID,desc,
+ id,jobID,jobDesc,
taskID,transportType,sourceData,
- targetData,state,jobType,
- fromRegion,createTime,updateTime
+ targetData,jobState,jobType,
+ fromRegion,runningRegion,createUid,
+ updateUid,createTime,updateTime
diff --git a/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml
index 05b1dc52a0..c3514fd945 100644
--- a/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml
+++ b/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml
@@ -26,10 +26,11 @@
-
-
-
-
+
+
+
+
+
@@ -37,8 +38,8 @@
- id,taskID,name,
- desc,state,fromRegion,
+ id,taskID,taskName,
+ taskDesc,taskState,sourceRegion,targetRegion,
createUid,updateUid,createTime,
updateTime
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java
index 2162731e21..612d398078 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java
@@ -17,6 +17,9 @@
package org.apache.eventmesh.admin.server;
+import java.util.List;
+import java.util.Map;
+
import org.springframework.boot.context.properties.ConfigurationProperties;
import lombok.Getter;
@@ -32,4 +35,6 @@ public class AdminServerProperties {
private String configurationPath;
private String configurationFile;
private String serviceName;
+ private Map> adminServerList;
+ private String region;
}
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
index bd896d546c..a5daac881e 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
@@ -24,18 +24,21 @@
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
+import com.alibaba.druid.support.json.JSONUtils;
+
@RestController
@RequestMapping("/eventmesh/admin")
public class HttpServer {
@Autowired
private TaskBizService taskService;
- @RequestMapping("/createTask")
- public ResponseEntity> createOrUpdateTask(@RequestBody CreateTaskRequest task) {
+ @RequestMapping(value = "/createTask", method = RequestMethod.POST)
+ public ResponseEntity
*
*
Implementing classes should ensure thread safety and handle HTTP/HTTPS communication efficiently.
* The {@link #start()} method initializes any necessary resources for HTTP/HTTPS communication. The {@link #handle(ConnectRecord)} method processes a
- * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, HttpConnectRecord)} method processes HttpConnectRecord on specified URL
- * while returning its own processing logic {@link #stop()} method releases any resources used for HTTP/HTTPS communication.
+ * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, HttpConnectRecord, Map)} method processes HttpConnectRecord on specified
+ * URL while returning its own processing logic {@link #stop()} method releases any resources used for HTTP/HTTPS communication.
*
- *
It's recommended to handle exceptions gracefully within the {@link #deliver(URI, HttpConnectRecord)} method
+ *
It's recommended to handle exceptions gracefully within the {@link #deliver(URI, HttpConnectRecord, Map)} method
* to prevent message loss or processing interruptions.
*/
public interface HttpSinkHandler {
@@ -62,9 +63,10 @@ public interface HttpSinkHandler {
*
* @param url URI to which the HttpConnectRecord should be sent
* @param httpConnectRecord HttpConnectRecord to process
+ * @param attributes additional attributes to be used in processing
* @return processing chain
*/
- Future> deliver(URI url, HttpConnectRecord httpConnectRecord);
+ Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes);
/**
* Cleans up and releases resources used by the HTTP/HTTPS handler. This method should be called when the handler is no longer needed.
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
similarity index 57%
rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
rename to eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
index 4bc365a139..0907847455 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
@@ -15,23 +15,23 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.http.sink.handle;
+package org.apache.eventmesh.connector.http.sink.handler.impl;
import org.apache.eventmesh.common.remote.offset.http.HttpRecordOffset;
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
+import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext;
+import org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
import org.apache.eventmesh.connector.http.util.HttpUtils;
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import java.net.URI;
-import java.util.Arrays;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.vertx.core.Future;
@@ -60,22 +60,13 @@
*/
@Slf4j
@Getter
-public class CommonHttpSinkHandler implements HttpSinkHandler {
-
- private final SinkConnectorConfig connectorConfig;
-
- private final List urls;
+public class CommonHttpSinkHandler extends AbstractHttpSinkHandler {
private WebClient webClient;
public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
- this.connectorConfig = sinkConnectorConfig;
- // Initialize URLs
- String[] urlStrings = sinkConnectorConfig.getUrls();
- this.urls = Arrays.stream(urlStrings)
- .map(URI::create)
- .collect(Collectors.toList());
+ super(sinkConnectorConfig);
}
/**
@@ -91,41 +82,57 @@ public void start() {
* Initializes the WebClient with the provided configuration options.
*/
private void doInitWebClient() {
+ SinkConnectorConfig sinkConnectorConfig = getSinkConnectorConfig();
final Vertx vertx = Vertx.vertx();
WebClientOptions options = new WebClientOptions()
- .setKeepAlive(this.connectorConfig.isKeepAlive())
- .setKeepAliveTimeout(this.connectorConfig.getKeepAliveTimeout() / 1000)
- .setIdleTimeout(this.connectorConfig.getIdleTimeout())
+ .setKeepAlive(sinkConnectorConfig.isKeepAlive())
+ .setKeepAliveTimeout(sinkConnectorConfig.getKeepAliveTimeout() / 1000)
+ .setIdleTimeout(sinkConnectorConfig.getIdleTimeout())
.setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
- .setConnectTimeout(this.connectorConfig.getConnectionTimeout())
- .setMaxPoolSize(this.connectorConfig.getMaxConnectionPoolSize());
+ .setConnectTimeout(sinkConnectorConfig.getConnectionTimeout())
+ .setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize());
this.webClient = WebClient.create(vertx, options);
}
/**
- * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
+ * Processes HttpConnectRecord on specified URL while returning its own processing logic. This method sends the HttpConnectRecord to the specified
+ * URL using the WebClient.
*
- * @param record the ConnectRecord to process
+ * @param url URI to which the HttpConnectRecord should be sent
+ * @param httpConnectRecord HttpConnectRecord to process
+ * @param attributes additional attributes to be used in processing
+ * @return processing chain
*/
@Override
- public void handle(ConnectRecord record) {
- for (URI url : this.urls) {
- // convert ConnectRecord to HttpConnectRecord
- String type = String.format("%s.%s.%s", connectorConfig.getConnectorName(), url.getScheme(), "common");
- HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);
- // get timestamp and offset
- Long timestamp = httpConnectRecord.getData().getTimestamp();
- Map offset = null;
- try {
- // May throw NullPointerException.
- offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
- } catch (NullPointerException e) {
- // ignore null pointer exception
- }
- final Map finalOffset = offset;
- Future> responseFuture = deliver(url, httpConnectRecord);
- responseFuture.onSuccess(res -> {
+ public Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes) {
+ // create headers
+ MultiMap headers = HttpHeaders.headers()
+ .set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8")
+ .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");
+
+ // get timestamp and offset
+ Long timestamp = httpConnectRecord.getData().getTimestamp();
+ Map offset = null;
+ try {
+ // May throw NullPointerException.
+ offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
+ } catch (NullPointerException e) {
+ // ignore null pointer exception
+ }
+ final Map finalOffset = offset;
+
+ // send the request
+ return this.webClient.post(url.getPath())
+ .host(url.getHost())
+ .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), "https") ? 443 : 80) : url.getPort())
+ .putHeaders(headers)
+ .ssl(Objects.equals(url.getScheme(), "https"))
+ .sendJson(httpConnectRecord)
+ .onSuccess(res -> {
log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, finalOffset);
+
+ Exception e = null;
+
// log the response
if (HttpUtils.is2xxSuccessful(res.statusCode())) {
if (log.isDebugEnabled()) {
@@ -135,7 +142,6 @@ public void handle(ConnectRecord record) {
log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
finalOffset);
}
- record.getCallback().onSuccess(convertToSendResult(record));
} else {
if (log.isDebugEnabled()) {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
@@ -144,14 +150,96 @@ public void handle(ConnectRecord record) {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
finalOffset);
}
- record.getCallback()
- .onException(buildSendExceptionContext(record, new RuntimeException("HTTP response code: " + res.statusCode())));
+
+ e = new RuntimeException("Unexpected HTTP response code: " + res.statusCode());
}
+
+ // try callback
+ tryCallback(httpConnectRecord, e, attributes);
}).onFailure(err -> {
log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, finalOffset, err);
- record.getCallback().onException(buildSendExceptionContext(record, err));
+
+ // try callback
+ tryCallback(httpConnectRecord, err, attributes);
});
+ }
+
+ /**
+ * Tries to call the callback based on the result of the request.
+ *
+ * @param httpConnectRecord the HttpConnectRecord to use
+ * @param e the exception thrown during the request, may be null
+ * @param attributes additional attributes to be used in processing
+ */
+ private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map attributes) {
+ // get the retry event
+ HttpRetryEvent retryEvent = getAndUpdateRetryEvent(attributes, httpConnectRecord, e);
+
+ // get the multi http request context
+ MultiHttpRequestContext multiHttpRequestContext = getAndUpdateMultiHttpRequestContext(attributes, retryEvent);
+
+ if (multiHttpRequestContext.getRemainingRequests() == 0) {
+ // do callback
+ ConnectRecord record = httpConnectRecord.getData();
+ if (record.getCallback() == null) {
+ if (log.isDebugEnabled()) {
+ log.warn("ConnectRecord callback is null. Ignoring callback. {}", record);
+ } else {
+ log.warn("ConnectRecord callback is null. Ignoring callback.");
+ }
+ return;
+ }
+
+ HttpRetryEvent lastFailedEvent = multiHttpRequestContext.getLastFailedEvent();
+ if (lastFailedEvent == null) {
+ // success
+ record.getCallback().onSuccess(convertToSendResult(record));
+ } else {
+ // failure
+ record.getCallback().onException(buildSendExceptionContext(record, lastFailedEvent.getLastException()));
+ }
+ }
+ }
+
+ /**
+ * Gets and updates the retry event based on the provided attributes and HttpConnectRecord.
+ *
+ * @param attributes the attributes to use
+ * @param httpConnectRecord the HttpConnectRecord to use
+ * @param e the exception thrown during the request, may be null
+ * @return the updated retry event
+ */
+ private HttpRetryEvent getAndUpdateRetryEvent(Map attributes, HttpConnectRecord httpConnectRecord, Throwable e) {
+ // get the retry event
+ HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
+ // update the retry event
+ retryEvent.setLastException(e);
+ return retryEvent;
+ }
+
+
+ /**
+ * Gets and updates the multi http request context based on the provided attributes and HttpConnectRecord.
+ *
+ * @param attributes the attributes to use
+ * @param retryEvent the retry event to use
+ * @return the updated multi http request context
+ */
+ private MultiHttpRequestContext getAndUpdateMultiHttpRequestContext(Map attributes, HttpRetryEvent retryEvent) {
+ // get the multi http request context
+ MultiHttpRequestContext multiHttpRequestContext = (MultiHttpRequestContext) attributes.get(MultiHttpRequestContext.NAME);
+
+ if (retryEvent.getLastException() == null || retryEvent.isMaxRetriesReached()) {
+ // decrement the counter
+ multiHttpRequestContext.decrementRemainingRequests();
+
+ // try set failed event
+ if (retryEvent.getLastException() != null) {
+ multiHttpRequestContext.setLastFailedEvent(retryEvent);
+ }
}
+
+ return multiHttpRequestContext;
}
private SendResult convertToSendResult(ConnectRecord record) {
@@ -174,30 +262,6 @@ private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Thr
}
- /**
- * Processes HttpConnectRecord on specified URL while returning its own processing logic. This method sends the HttpConnectRecord to the specified
- * URL using the WebClient.
- *
- * @param url URI to which the HttpConnectRecord should be sent
- * @param httpConnectRecord HttpConnectRecord to process
- * @return processing chain
- */
- @Override
- public Future> deliver(URI url, HttpConnectRecord httpConnectRecord) {
- // create headers
- MultiMap headers = HttpHeaders.headers()
- .set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8")
- .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");
- // send the request
- return this.webClient.post(url.getPath())
- .host(url.getHost())
- .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), "https") ? 443 : 80) : url.getPort())
- .putHeaders(headers)
- .ssl(Objects.equals(url.getScheme(), "https"))
- .sendJson(httpConnectRecord);
- }
-
-
/**
* Cleans up and releases resources used by the HTTP/HTTPS handler.
*/
@@ -209,6 +273,4 @@ public void stop() {
log.warn("WebClient is null, ignore.");
}
}
-
-
}
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
new file mode 100644
index 0000000000..268d0a0d6d
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handler.impl;
+
+import org.apache.eventmesh.connector.http.sink.config.HttpRetryConfig;
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
+import org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
+import org.apache.eventmesh.connector.http.sink.handler.HttpSinkHandler;
+import org.apache.eventmesh.connector.http.util.HttpUtils;
+
+import java.net.ConnectException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.Map;
+
+import io.vertx.core.Future;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+
+import lombok.extern.slf4j.Slf4j;
+
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
+
+
+/**
+ * HttpSinkHandlerRetryWrapper is a wrapper class for the HttpSinkHandler that provides retry functionality for failed HTTP requests.
+ */
+@Slf4j
+public class HttpSinkHandlerRetryWrapper extends AbstractHttpSinkHandler {
+
+ private final HttpRetryConfig httpRetryConfig;
+
+ private final HttpSinkHandler sinkHandler;
+
+ public HttpSinkHandlerRetryWrapper(SinkConnectorConfig sinkConnectorConfig, HttpSinkHandler sinkHandler) {
+ super(sinkConnectorConfig);
+ this.sinkHandler = sinkHandler;
+ this.httpRetryConfig = getSinkConnectorConfig().getRetryConfig();
+ }
+
+ /**
+ * Initializes the WebClient for making HTTP requests based on the provided SinkConnectorConfig.
+ */
+ @Override
+ public void start() {
+ sinkHandler.start();
+ }
+
+
+ /**
+ * Processes HttpConnectRecord on specified URL while returning its own processing logic This method provides the retry power to process the
+ * HttpConnectRecord
+ *
+ * @param url URI to which the HttpConnectRecord should be sent
+ * @param httpConnectRecord HttpConnectRecord to process
+ * @param attributes additional attributes to pass to the processing chain
+ * @return processing chain
+ */
+ @Override
+ public Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes) {
+
+ // Build the retry policy
+ RetryPolicy> retryPolicy = RetryPolicy.>builder()
+ .handleIf(e -> e instanceof ConnectException)
+ .handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode()))
+ .withMaxRetries(httpRetryConfig.getMaxRetries())
+ .withDelay(Duration.ofMillis(httpRetryConfig.getInterval()))
+ .onRetry(event -> {
+ if (log.isDebugEnabled()) {
+ log.warn("Retrying the request to {} for the {} time. {}", url, event.getAttemptCount(), httpConnectRecord);
+ } else {
+ log.warn("Retrying the request to {} for the {} time.", url, event.getAttemptCount());
+ }
+ // update the retry event
+ HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
+ retryEvent.increaseCurrentRetries();
+ })
+ .onFailure(event -> {
+ if (log.isDebugEnabled()) {
+ log.error("Failed to send the request to {} after {} attempts. {}", url, event.getAttemptCount(),
+ httpConnectRecord, event.getException());
+ } else {
+ log.error("Failed to send the request to {} after {} attempts.", url, event.getAttemptCount(), event.getException());
+ }
+ }).build();
+
+ // Handle the ConnectRecord with retry policy
+ Failsafe.with(retryPolicy)
+ .getStageAsync(() -> sinkHandler.deliver(url, httpConnectRecord, attributes).toCompletionStage());
+
+ return null;
+ }
+
+
+ /**
+ * Cleans up and releases resources used by the HTTP/HTTPS handler.
+ */
+ @Override
+ public void stop() {
+ sinkHandler.stop();
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
similarity index 82%
rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
rename to eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
index 4e64126a9d..ff8f69d45a 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.http.sink.handle;
+package org.apache.eventmesh.connector.http.sink.handler.impl;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
@@ -25,13 +25,14 @@
import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata;
import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord;
import org.apache.eventmesh.connector.http.sink.data.HttpExportRecordPage;
-import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
import org.apache.commons.lang3.StringUtils;
import java.net.URI;
import java.time.LocalDateTime;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@@ -61,8 +62,6 @@
@Slf4j
public class WebhookHttpSinkHandler extends CommonHttpSinkHandler {
- private final SinkConnectorConfig sinkConnectorConfig;
-
// the configuration for webhook
private final HttpWebhookConfig webhookConfig;
@@ -86,7 +85,7 @@ public boolean isExportDestroyed() {
public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
super(sinkConnectorConfig);
- this.sinkConnectorConfig = sinkConnectorConfig;
+
this.webhookConfig = sinkConnectorConfig.getWebhookConfig();
int maxQueueSize = this.webhookConfig.getMaxStorageSize();
this.receivedDataQueue = new SynchronizedCircularFifoQueue<>(maxQueueSize);
@@ -94,9 +93,6 @@ public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
doInitExportServer();
}
- public SynchronizedCircularFifoQueue getReceivedDataQueue() {
- return receivedDataQueue;
- }
/**
* Initialize the server for exporting the received data
@@ -202,22 +198,6 @@ public void start() {
});
}
- /**
- * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method should be called for each ConnectRecord that needs to be processed.
- *
- * @param record the ConnectRecord to process
- */
- @Override
- public void handle(ConnectRecord record) {
- for (URI url : super.getUrls()) {
- // convert ConnectRecord to HttpConnectRecord
- String type = String.format("%s.%s.%s", this.getConnectorConfig().getConnectorName(), url.getScheme(), "webhook");
- HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);
- // handle the HttpConnectRecord
- deliver(url, httpConnectRecord);
- }
- }
-
/**
* Processes HttpConnectRecord on specified URL while returning its own processing logic This method sends the HttpConnectRecord to the specified
@@ -225,30 +205,27 @@ public void handle(ConnectRecord record) {
*
* @param url URI to which the HttpConnectRecord should be sent
* @param httpConnectRecord HttpConnectRecord to process
+ * @param attributes additional attributes to be used in processing
* @return processing chain
*/
@Override
- public Future> deliver(URI url, HttpConnectRecord httpConnectRecord) {
+ public Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes) {
// send the request
- Future> responseFuture = super.deliver(url, httpConnectRecord);
+ Future> responseFuture = super.deliver(url, httpConnectRecord, attributes);
// store the received data
return responseFuture.onComplete(arr -> {
- // If open retry, return directly and handled by RetryHttpSinkHandler
- if (sinkConnectorConfig.getRetryConfig().getMaxRetries() > 0) {
- return;
+ // get tryEvent from attributes
+ HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
+
+ HttpResponse response = null;
+ if (arr.succeeded()) {
+ response = arr.result();
+ } else {
+ retryEvent.setLastException(arr.cause());
}
- // create ExportMetadataBuilder
- HttpResponse response = arr.succeeded() ? arr.result() : null;
-
- HttpExportMetadata httpExportMetadata = HttpExportMetadata.builder()
- .url(url.toString())
- .code(response != null ? response.statusCode() : -1)
- .message(response != null ? response.statusMessage() : arr.cause().getMessage())
- .receivedTime(LocalDateTime.now())
- .retriedBy(null)
- .uuid(httpConnectRecord.getUuid())
- .retryNum(0)
- .build();
+
+ // create ExportMetadata
+ HttpExportMetadata httpExportMetadata = buildHttpExportMetadata(url, response, httpConnectRecord, retryEvent);
// create ExportRecord
HttpExportRecord exportRecord = new HttpExportRecord(httpExportMetadata, arr.succeeded() ? arr.result().bodyAsString() : null);
@@ -257,6 +234,38 @@ public Future> deliver(URI url, HttpConnectRecord httpConne
});
}
+ /**
+ * Builds the HttpExportMetadata object based on the response, HttpConnectRecord, and HttpRetryEvent.
+ *
+ * @param url the URI to which the HttpConnectRecord was sent
+ * @param response the response received from the URI
+ * @param httpConnectRecord the HttpConnectRecord that was sent
+ * @param retryEvent the SingleHttpRetryEvent that was used for retries
+ * @return the HttpExportMetadata object
+ */
+ private HttpExportMetadata buildHttpExportMetadata(URI url, HttpResponse response, HttpConnectRecord httpConnectRecord,
+ HttpRetryEvent retryEvent) {
+
+ String msg = null;
+ // order of precedence: lastException > response > null
+ if (retryEvent.getLastException() != null) {
+ msg = retryEvent.getLimitedExceptionMessage();
+ retryEvent.setLastException(null);
+ } else if (response != null) {
+ msg = response.statusMessage();
+ }
+
+ return HttpExportMetadata.builder()
+ .url(url.toString())
+ .code(response != null ? response.statusCode() : -1)
+ .message(msg)
+ .receivedTime(LocalDateTime.now())
+ .httpRecordId(httpConnectRecord.getHttpRecordId())
+ .recordId(httpConnectRecord.getData().getRecordId())
+ .retryNum(retryEvent.getCurrentRetries())
+ .build();
+ }
+
/**
* Cleans up and releases resources used by the HTTP/HTTPS handler.
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
index 3e724627c0..7ddba511c4 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
+++ b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
@@ -86,7 +86,7 @@ void before() throws Exception {
JSONObject requestBody = JSON.parseObject(httpRequest.getBodyAsString());
return HttpResponse.response()
.withContentType(MediaType.APPLICATION_JSON)
- .withStatusCode(200)
+ .withStatusCode(HttpStatus.SC_OK)
.withBody(new JSONObject()
.fluentPut("code", 0)
.fluentPut("message", "success")
From 9701f02660ba04ff37bd0b5787b614a3c91d8bfd Mon Sep 17 00:00:00 2001
From: mike_xwm
Date: Wed, 21 Aug 2024 11:26:23 +0800
Subject: [PATCH 07/51] [ISSUE #5079] Enhancement update for admin-server
(#5080)
* [ISSUE #5079] Enhancement update for admin-server
* fix check style error
* fix check style error
---
eventmesh-admin-server/bin/start-admin.sh | 51 +++++++------
eventmesh-admin-server/build.gradle | 2 +
eventmesh-admin-server/conf/application.yaml | 8 +-
eventmesh-admin-server/conf/eventmesh.sql | 2 +-
.../conf/mapper/EventMeshVerifyMapper.xml | 5 +-
.../admin/server/web/HttpServer.java | 23 ++++++
.../admin/server/web/db/DBThreadPool.java | 26 ++++++-
.../server/web/db/entity/EventMeshVerify.java | 3 +
.../handler/impl/FetchJobRequestHandler.java | 2 +-
.../handler/impl/ReportJobRequestHandler.java | 59 +++++++++++++++
.../handler/impl/ReportPositionHandler.java | 2 +
.../web/handler/impl/ReportVerifyHandler.java | 49 +++++++++++-
.../web/service/job/JobInfoBizService.java | 74 ++++++++++++++++++-
.../position/impl/HttpPositionHandler.java | 61 +++++++++++++++
.../web/service/verify/VerifyBizService.java | 2 +
.../eventmesh/common/remote/JobState.java | 52 +++++++++++++
.../common/remote/TransportType.java | 1 +
.../remote/request/ReportJobRequest.java | 37 ++++++++++
.../remote/request/ReportVerifyRequest.java | 2 +
...e.eventmesh.common.remote.payload.IPayload | 1 +
.../offsetmgmt/admin/AdminOffsetService.java | 3 +
21 files changed, 423 insertions(+), 42 deletions(-)
create mode 100644 eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java
create mode 100644 eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportJobRequest.java
diff --git a/eventmesh-admin-server/bin/start-admin.sh b/eventmesh-admin-server/bin/start-admin.sh
index 93c3644397..1633036617 100644
--- a/eventmesh-admin-server/bin/start-admin.sh
+++ b/eventmesh-admin-server/bin/start-admin.sh
@@ -56,34 +56,34 @@ function extract_java_version {
#}
function get_pid {
- local ppid=""
- if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
- ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
- # If the process does not exist, it indicates that the previous process terminated abnormally.
+ local ppid=""
+ if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
+ ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
+ # If the process does not exist, it indicates that the previous process terminated abnormally.
if [ ! -d /proc/$ppid ]; then
# Remove the residual file.
rm ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
echo -e "ERROR\t EventMesh process had already terminated unexpectedly before, please check log output."
ppid=""
fi
- else
- if [[ $OS =~ Msys ]]; then
- # There is a Bug on Msys that may not be able to kill the identified process
- ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}`
- elif [[ $OS =~ Darwin ]]; then
- # Known problem: grep Java may not be able to accurately identify Java processes
- ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'})
- else
- if [ $DOCKER ]; then
- # No need to exclude root user in Docker containers.
- ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'})
- else
+ else
+ if [[ $OS =~ Msys ]]; then
+ # There is a Bug on Msys that may not be able to kill the identified process
+ ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}`
+ elif [[ $OS =~ Darwin ]]; then
+ # Known problem: grep Java may not be able to accurately identify Java processes
+ ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'})
+ else
+ if [ $DOCKER ]; then
+ # No need to exclude root user in Docker containers.
+ ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'})
+ else
# It is required to identify the process as accurately as possible on Linux.
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" | awk -F ' ' {'print $2'})
fi
- fi
- fi
- echo "$ppid";
+ fi
+ fi
+ echo "$ppid";
}
#===========================================================================================
@@ -136,8 +136,7 @@ export JAVA_HOME
GC_LOG_FILE="${EVENTMESH_ADMIN_LOG_HOME}/eventmesh_admin_gc_%p.log"
-#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4"
-JAVA_OPT=`cat ${EVENTMESH_ADMIN_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}`
+JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50"
JAVA_OPT="${JAVA_OPT} -verbose:gc"
if [[ "$JAVA_VERSION" == "8" ]]; then
@@ -172,7 +171,7 @@ JAVA_OPT="${JAVA_OPT} -DeventMeshPluginDir=${EVENTMESH_ADMIN_HOME}/plugin"
# echo "proxy is running already"
# exit 9;
# else
-# echo "err pid$pid, rm pid.file"
+# echo "err pid$pid, rm pid.file"
# rm pid.file
# fi
#fi
@@ -183,8 +182,8 @@ if [[ $pid == "ERROR"* ]]; then
exit 9
fi
if [ -n "$pid" ]; then
- echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again."
- exit 9
+ echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again."
+ exit 9
fi
make_logs_dir
@@ -193,9 +192,9 @@ echo "Using Java version: $JAVA_VERSION, path: $JAVA" >> ${EVENTMESH_ADMIN_LOG_H
EVENTMESH_ADMIN_MAIN=org.apache.eventmesh.admin.server.ExampleAdminServer
if [ $DOCKER ]; then
- $JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
+ $JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
else
- $JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
+ $JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
echo $!>${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
fi
exit 0
diff --git a/eventmesh-admin-server/build.gradle b/eventmesh-admin-server/build.gradle
index 1fec2c7c52..95c8fa1372 100644
--- a/eventmesh-admin-server/build.gradle
+++ b/eventmesh-admin-server/build.gradle
@@ -38,6 +38,8 @@ dependencies {
implementation "com.alibaba:druid-spring-boot-starter"
compileOnly 'com.mysql:mysql-connector-j'
compileOnly 'org.projectlombok:lombok'
+ testImplementation 'junit:junit:4.12'
+ testImplementation 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}
diff --git a/eventmesh-admin-server/conf/application.yaml b/eventmesh-admin-server/conf/application.yaml
index 274196db60..3d702e579e 100644
--- a/eventmesh-admin-server/conf/application.yaml
+++ b/eventmesh-admin-server/conf/application.yaml
@@ -35,8 +35,8 @@ event-mesh:
# grpc server port
port: 8081
adminServerList:
- region1:
+ R1:
- http://localhost:8082
- region2:
- - http://localhost:8083
- region: region1
\ No newline at end of file
+ R2:
+ - http://localhost:8082
+ region: R1
\ No newline at end of file
diff --git a/eventmesh-admin-server/conf/eventmesh.sql b/eventmesh-admin-server/conf/eventmesh.sql
index 986320570a..6e28daca8a 100644
--- a/eventmesh-admin-server/conf/eventmesh.sql
+++ b/eventmesh-admin-server/conf/eventmesh.sql
@@ -102,7 +102,6 @@ CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` (
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
- UNIQUE KEY `runtimeAddr` (`runtimeAddr`),
KEY `jobID` (`jobID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
@@ -137,6 +136,7 @@ CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`taskID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
+ `jobID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`recordID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`recordSig` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`connectorName` varchar(200) COLLATE utf8_bin DEFAULT NULL,
diff --git a/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml
index b7b042145a..45727498cc 100644
--- a/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml
+++ b/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml
@@ -26,6 +26,7 @@
+
@@ -35,8 +36,8 @@
- id,taskID,recordID,
- recordSig,connectorName,connectorStage,
+ id,taskID,jobID,recordID,
+ recordSig,connectorName,connectorStage,
position,createTime
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
index 12afb3a3d4..2454e9f02c 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
@@ -18,7 +18,9 @@
package org.apache.eventmesh.admin.server.web;
import org.apache.eventmesh.admin.server.web.service.task.TaskBizService;
+import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService;
import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
+import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
import org.apache.eventmesh.common.remote.response.CreateTaskResponse;
import org.apache.eventmesh.common.utils.JsonUtils;
@@ -29,19 +31,40 @@
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
+import lombok.extern.slf4j.Slf4j;
+
@RestController
@RequestMapping("/eventmesh/admin")
+@Slf4j
public class HttpServer {
@Autowired
private TaskBizService taskService;
+ @Autowired
+ private VerifyBizService verifyService;
+
@RequestMapping(value = "/createTask", method = RequestMethod.POST)
public ResponseEntity createOrUpdateTask(@RequestBody CreateTaskRequest task) {
+ log.info("receive http proto create task:{}", task);
CreateTaskResponse createTaskResponse = taskService.createTask(task);
+ log.info("receive http proto create task result:{}", createTaskResponse);
return ResponseEntity.ok(JsonUtils.toJSONString(Response.success(createTaskResponse)));
}
+
+ @RequestMapping(value = "/reportVerify", method = RequestMethod.POST)
+ public ResponseEntity reportVerify(@RequestBody ReportVerifyRequest request) {
+ log.info("receive http proto report verify request:{}", request);
+ boolean result = verifyService.reportVerifyRecord(request);
+ log.info("receive http proto report verify result:{}", result);
+ if (result) {
+ return ResponseEntity.ok("report verify success.request:" + JsonUtils.toJSONString(request));
+ } else {
+ return ResponseEntity.internalServerError().body("report verify success.request:" + JsonUtils.toJSONString(request));
+ }
+ }
+
public boolean deleteTask(Long id) {
return false;
}
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java
index f1de764967..277ea66656 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java
@@ -20,6 +20,7 @@
import org.apache.eventmesh.common.EventMeshThreadFactory;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -39,17 +40,34 @@ public class DBThreadPool {
new LinkedBlockingQueue<>(1000), new EventMeshThreadFactory("admin-server-db"),
new ThreadPoolExecutor.DiscardOldestPolicy());
+
+ private final ScheduledThreadPoolExecutor checkScheduledExecutor =
+ new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new EventMeshThreadFactory("admin-server-check-scheduled"),
+ new ThreadPoolExecutor.DiscardOldestPolicy());
+
@PreDestroy
private void destroy() {
if (!executor.isShutdown()) {
try {
executor.shutdown();
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
- log.info("wait heart beat handler thread pool shutdown timeout, it will shutdown immediately");
+ log.info("wait handler thread pool shutdown timeout, it will shutdown immediately");
executor.shutdownNow();
}
} catch (InterruptedException e) {
- log.warn("wait heart beat handler thread pool shutdown fail");
+ log.warn("wait handler thread pool shutdown fail");
+ }
+ }
+
+ if (!checkScheduledExecutor.isShutdown()) {
+ try {
+ checkScheduledExecutor.shutdown();
+ if (!checkScheduledExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+ log.info("wait scheduled thread pool shutdown timeout, it will shutdown immediately");
+ checkScheduledExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ log.warn("wait scheduled thread pool shutdown fail");
}
}
}
@@ -57,4 +75,8 @@ private void destroy() {
public ThreadPoolExecutor getExecutors() {
return executor;
}
+
+ public ScheduledThreadPoolExecutor getCheckExecutor() {
+ return checkScheduledExecutor;
+ }
}
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java
index 5425c5c57b..c5a6c35f8d 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java
@@ -32,11 +32,14 @@
@TableName(value = "event_mesh_verify")
@Data
public class EventMeshVerify implements Serializable {
+
@TableId(type = IdType.AUTO)
private Integer id;
private String taskID;
+ private String jobID;
+
private String recordID;
private String recordSig;
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
index b377bcddd8..3392084c28 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
@@ -56,7 +56,7 @@ public FetchJobResponse handler(FetchJobRequest request, Metadata metadata) {
config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource().getConf()));
config.setSourceConnectorDesc(detail.getSourceConnectorDesc());
config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource().getConf()));
- config.setSourceConnectorDesc(detail.getSinkConnectorDesc());
+ config.setSinkConnectorDesc(detail.getSinkConnectorDesc());
response.setConnectorConfig(config);
response.setTransportType(detail.getTransportType());
response.setState(detail.getState());
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java
new file mode 100644
index 0000000000..ea836ce7aa
--- /dev/null
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.admin.server.web.handler.impl;
+
+import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
+import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
+import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.remote.exception.ErrorCode;
+import org.apache.eventmesh.common.remote.request.ReportJobRequest;
+import org.apache.eventmesh.common.remote.response.SimpleResponse;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Component
+@Slf4j
+public class ReportJobRequestHandler extends BaseRequestHandler {
+
+ @Autowired
+ JobInfoBizService jobInfoBizService;
+
+ @Override
+ public SimpleResponse handler(ReportJobRequest request, Metadata metadata) {
+ log.info("receive report job request:{}", request);
+ if (StringUtils.isBlank(request.getJobID())) {
+ return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, it's empty");
+ }
+ EventMeshJobInfo jobInfo = jobInfoBizService.getJobInfo(request.getJobID());
+ if (jobInfo == null) {
+ return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, not exist target job,jobID:" + request.getJobID());
+ }
+ boolean result = jobInfoBizService.updateJobState(jobInfo.getJobID(), request.getState());
+ if (result) {
+ return SimpleResponse.success();
+ } else {
+ return SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "update job failed.");
+ }
+ }
+}
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java
index 5e2a968262..7a30bef80a 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java
@@ -37,6 +37,7 @@
@Component
@Slf4j
public class ReportPositionHandler extends BaseRequestHandler {
+
@Autowired
private JobInfoBizService jobInfoBizService;
@@ -48,6 +49,7 @@ public class ReportPositionHandler extends BaseRequestHandler {
+
@Autowired
private VerifyBizService verifyService;
+ @Autowired
+ JobInfoBizService jobInfoBizService;
+
+ @Autowired
+ private AdminServerProperties properties;
+
@Override
protected SimpleResponse handler(ReportVerifyRequest request, Metadata metadata) {
- if (StringUtils.isAnyBlank(request.getTaskID(), request.getRecordSig(), request.getRecordID(), request.getConnectorStage())) {
+ if (StringUtils.isAnyBlank(request.getTaskID(), request.getJobID(), request.getRecordSig(), request.getRecordID(),
+ request.getConnectorStage())) {
log.info("report verify request [{}] illegal", request);
- return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request task id, sign, record id or stage is none");
+ return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request task id,job id, sign, record id or stage is none");
+ }
+
+ String jobID = request.getJobID();
+ EventMeshJobInfo jobInfo = jobInfoBizService.getJobInfo(jobID);
+ if (jobInfo == null || StringUtils.isBlank(jobInfo.getFromRegion())) {
+ log.info("report verify job info [{}] illegal", request);
+ return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "job info is null or fromRegion is blank,job id:" + jobID);
}
- return verifyService.reportVerifyRecord(request) ? SimpleResponse.success() : SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save verify "
+
+ String fromRegion = jobInfo.getFromRegion();
+ String localRegion = properties.getRegion();
+ log.info("report verify request from region:{},localRegion:{},request:{}", fromRegion, localRegion, request);
+ if (fromRegion.equalsIgnoreCase(localRegion)) {
+ return verifyService.reportVerifyRecord(request) ? SimpleResponse.success() : SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save verify "
+ "request fail");
+ } else {
+ log.info("start transfer report verify to from region admin server. from region:{}", fromRegion);
+ List adminServerList = properties.getAdminServerList().get(fromRegion);
+ if (adminServerList == null || adminServerList.isEmpty()) {
+ throw new RuntimeException("No admin server available for region: " + fromRegion);
+ }
+ String targetUrl = adminServerList.get(new Random().nextInt(adminServerList.size())) + "/eventmesh/admin/reportVerify";
+ RestTemplate restTemplate = new RestTemplate();
+ ResponseEntity response = restTemplate.postForEntity(targetUrl, request, String.class);
+ if (!response.getStatusCode().is2xxSuccessful()) {
+ return SimpleResponse.fail(ErrorCode.INTERNAL_ERR,
+ "save verify request fail,code:" + response.getStatusCode() + ",msg:" + response.getBody());
+ }
+ return SimpleResponse.success();
+ }
}
}
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
index 0657383e23..a8b469d8b7 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
@@ -19,15 +19,19 @@
import org.apache.eventmesh.admin.server.AdminServerProperties;
import org.apache.eventmesh.admin.server.AdminServerRuntimeException;
+import org.apache.eventmesh.admin.server.web.db.DBThreadPool;
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource;
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
+import org.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat;
import org.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService;
import org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoExtService;
import org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService;
+import org.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatService;
import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
import org.apache.eventmesh.admin.server.web.service.datasource.DataSourceBizService;
import org.apache.eventmesh.admin.server.web.service.position.PositionBizService;
import org.apache.eventmesh.common.config.connector.Config;
+import org.apache.eventmesh.common.remote.JobState;
import org.apache.eventmesh.common.remote.TaskState;
import org.apache.eventmesh.common.remote.TransportType;
import org.apache.eventmesh.common.remote.datasource.DataSource;
@@ -38,9 +42,13 @@
import org.apache.commons.lang3.StringUtils;
+import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -75,13 +83,41 @@ public class JobInfoBizService {
@Autowired
private AdminServerProperties properties;
+ @Autowired
+ EventMeshRuntimeHeartbeatService heartbeatService;
+
+ private final long heatBeatPeriod = Duration.ofMillis(5000).toMillis();
+
+ @Autowired
+ DBThreadPool executor;
+
+ @PostConstruct
+ public void init() {
+ log.info("init check job info scheduled task.");
+ executor.getCheckExecutor().scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ checkJobInfo();
+ }
+ }, 10, 10, TimeUnit.SECONDS);
+ }
+
public boolean updateJobState(String jobID, TaskState state) {
if (jobID == null || state == null) {
return false;
}
EventMeshJobInfo jobInfo = new EventMeshJobInfo();
jobInfo.setJobState(state.name());
- return jobInfoService.update(jobInfo, Wrappers.update().eq("jobID", jobID).ne("state", TaskState.DELETE.name()));
+ return jobInfoService.update(jobInfo, Wrappers.update().eq("jobID", jobID).ne("jobState", JobState.DELETE.name()));
+ }
+
+ public boolean updateJobState(String jobID, JobState state) {
+ if (jobID == null || state == null) {
+ return false;
+ }
+ EventMeshJobInfo jobInfo = new EventMeshJobInfo();
+ jobInfo.setJobState(state.name());
+ return jobInfoService.update(jobInfo, Wrappers.update().eq("jobID", jobID).ne("jobState", JobState.DELETE.name()));
}
@Transactional
@@ -114,7 +150,8 @@ public List createJobs(List jobs) {
source.setOperator(job.getCreateUid());
source.setRegion(job.getSourceDataSource().getRegion());
source.setDesc(job.getSourceConnectorDesc());
- source.setConfig(job.getSourceDataSource().getConf());
+ Config sourceConfig = job.getSourceDataSource().getConf();
+ source.setConfig(sourceConfig);
source.setConfigClass(job.getSourceDataSource().getConfClazz().getName());
EventMeshDataSource createdSource = dataSourceBizService.createDataSource(source);
entity.setSourceData(createdSource.getId());
@@ -124,7 +161,8 @@ public List createJobs(List jobs) {
sink.setOperator(job.getCreateUid());
sink.setRegion(job.getSinkDataSource().getRegion());
sink.setDesc(job.getSinkConnectorDesc());
- sink.setConfig(job.getSinkDataSource().getConf());
+ Config sinkConfig = job.getSinkDataSource().getConf();
+ sink.setConfig(sinkConfig);
sink.setConfigClass(job.getSinkDataSource().getConfClazz().getName());
EventMeshDataSource createdSink = dataSourceBizService.createDataSource(sink);
entity.setTargetData(createdSink.getId());
@@ -195,6 +233,36 @@ public JobDetail getJobDetail(String jobID) {
detail.setTransportType(TransportType.getTransportType(job.getTransportType()));
return detail;
}
+
+ public EventMeshJobInfo getJobInfo(String jobID) {
+ if (jobID == null) {
+ return null;
+ }
+ EventMeshJobInfo job = jobInfoService.getOne(Wrappers.query().eq("jobID", jobID));
+ return job;
+ }
+
+ public void checkJobInfo() {
+ List eventMeshJobInfoList = jobInfoService.list(Wrappers.query().eq("jobState", JobState.RUNNING.name()));
+ log.info("start check job info.to check job size:{}", eventMeshJobInfoList.size());
+ for (EventMeshJobInfo jobInfo : eventMeshJobInfoList) {
+ String jobID = jobInfo.getJobID();
+ if (StringUtils.isEmpty(jobID)) {
+ continue;
+ }
+ EventMeshRuntimeHeartbeat heartbeat = heartbeatService.getOne(Wrappers.query().eq("jobID", jobID));
+ if (heartbeat == null) {
+ continue;
+ }
+ // if last heart beat update time have delay three period.print job heart beat delay warn
+ long currentTimeStamp = System.currentTimeMillis();
+ if (currentTimeStamp - heartbeat.getUpdateTime().getTime() > 3 * heatBeatPeriod) {
+ log.warn("current job heart heart has delay.jobID:{},currentTimeStamp:{},last update time:{}", jobID, currentTimeStamp,
+ heartbeat.getUpdateTime());
+ }
+ }
+ }
+
}
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java
new file mode 100644
index 0000000000..b8d536f388
--- /dev/null
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.admin.server.web.service.position.impl;
+
+import org.apache.eventmesh.admin.server.web.db.service.EventMeshPositionReporterHistoryService;
+import org.apache.eventmesh.admin.server.web.service.position.PositionHandler;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.remote.datasource.DataSourceType;
+import org.apache.eventmesh.common.remote.offset.RecordPosition;
+import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
+import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Component
+@Slf4j
+public class HttpPositionHandler extends PositionHandler {
+
+ @Autowired
+ EventMeshPositionReporterHistoryService historyService;
+
+ @Override
+ protected DataSourceType getSourceType() {
+ return DataSourceType.HTTP;
+ }
+
+ @Override
+ public boolean handler(ReportPositionRequest request, Metadata metadata) {
+ log.info("receive http position report request:{}", request);
+ // mock wemq postion report store
+ return true;
+ }
+
+ @Override
+ public List handler(FetchPositionRequest request, Metadata metadata) {
+ // mock http position fetch request
+ List recordPositionList = new ArrayList<>();
+ return recordPositionList;
+ }
+}
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java
index 74f208b199..e4f08b30cc 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java
@@ -26,6 +26,7 @@
@Service
public class VerifyBizService {
+
@Autowired
private EventMeshVerifyService verifyService;
@@ -35,6 +36,7 @@ public boolean reportVerifyRecord(ReportVerifyRequest request) {
verify.setRecordSig(request.getRecordSig());
verify.setPosition(request.getPosition());
verify.setTaskID(request.getTaskID());
+ verify.setJobID(request.getJobID());
verify.setConnectorName(request.getConnectorName());
verify.setConnectorStage(request.getConnectorStage());
return verifyService.save(verify);
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java
new file mode 100644
index 0000000000..da9daffe9c
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.remote;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import lombok.ToString;
+
+@ToString
+public enum JobState {
+ INIT, RUNNING, COMPLETE, DELETE, FAIL;
+ private static final JobState[] STATES_NUM_INDEX = JobState.values();
+ private static final Map STATES_NAME_INDEX = new HashMap<>();
+
+ static {
+ for (JobState jobState : STATES_NUM_INDEX) {
+ STATES_NAME_INDEX.put(jobState.name(), jobState);
+ }
+ }
+
+ public static JobState fromIndex(Integer index) {
+ if (index == null || index < 0 || index >= STATES_NUM_INDEX.length) {
+ return null;
+ }
+
+ return STATES_NUM_INDEX[index];
+ }
+
+ public static JobState fromIndex(String index) {
+ if (index == null || index.isEmpty()) {
+ return null;
+ }
+
+ return STATES_NAME_INDEX.get(index);
+ }
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
index 82e7bc021d..6b43598398 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
@@ -35,6 +35,7 @@ public enum TransportType {
HTTP_REDIS(DataSourceType.HTTP, DataSourceType.REDIS),
HTTP_ROCKETMQ(DataSourceType.HTTP, DataSourceType.ROCKETMQ),
REDIS_MQ(DataSourceType.REDIS, DataSourceType.ROCKETMQ),
+ HTTP_HTTP(DataSourceType.HTTP, DataSourceType.HTTP),
;
private static final Map INDEX_TYPES = new HashMap<>();
private static final TransportType[] TYPES = TransportType.values();
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportJobRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportJobRequest.java
new file mode 100644
index 0000000000..aec33e4616
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportJobRequest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.remote.request;
+
+import org.apache.eventmesh.common.remote.JobState;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ToString
+public class ReportJobRequest extends BaseRemoteRequest {
+
+ private String jobID;
+
+ private JobState state;
+
+ private String address;
+
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
index cd541949f4..bd38881c3d 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
@@ -28,6 +28,8 @@ public class ReportVerifyRequest extends BaseRemoteRequest {
private String taskID;
+ private String jobID;
+
private String recordID;
private String recordSig;
diff --git a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
index 82d5c94dd3..433cf57ed1 100644
--- a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
+++ b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
@@ -16,6 +16,7 @@
org.apache.eventmesh.common.remote.request.FetchJobRequest
org.apache.eventmesh.common.remote.response.FetchJobResponse
org.apache.eventmesh.common.remote.request.ReportPositionRequest
+org.apache.eventmesh.common.remote.request.ReportJobRequest
org.apache.eventmesh.common.remote.request.ReportVerifyRequest
org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest
org.apache.eventmesh.common.remote.request.FetchPositionRequest
diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
index 977661b134..993352a979 100644
--- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
+++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
@@ -112,6 +112,8 @@ public void persist() {
reportPositionRequest.setRecordPositionList(recordToSyncList);
+ log.debug("start report position request: {}", JsonUtils.toJSONString(reportPositionRequest));
+
Metadata metadata = Metadata.newBuilder()
.setType(ReportPositionRequest.class.getSimpleName())
.build();
@@ -121,6 +123,7 @@ public void persist() {
.build())
.build();
requestObserver.onNext(payload);
+ log.debug("end report position request: {}", JsonUtils.toJSONString(reportPositionRequest));
for (Map.Entry entry : recordMap.entrySet()) {
positionStore.remove(entry.getKey());
From 60fb430ad7e2418ce8fd2d0e7b55bb507ea4ff7d Mon Sep 17 00:00:00 2001
From: mike_xwm
Date: Wed, 21 Aug 2024 20:26:35 +0800
Subject: [PATCH 08/51] [ISSUE #5081] Enhancement update for connectors &
admin-server (#5082)
* [ISSUE #5079] Enhancement update for admin-server
* fix check style error
* fix check style error
* [ISSUE #5081] Enhancement update for connectors & admin-server
* fix check style error
* fix check style error
---
.../web/service/job/JobInfoBizService.java | 3 +
.../common/config/ConfigService.java | 11 +-
.../connector/http}/HttpRetryConfig.java | 2 +-
.../connector/http}/HttpSinkConfig.java | 2 +-
.../connector/http}/HttpWebhookConfig.java | 2 +-
.../connector/http}/SinkConnectorConfig.java | 11 +-
.../connector/http/SourceConnectorConfig.java | 6 +-
.../connector/rdb/canal/CanalSinkConfig.java | 25 +-
.../rdb/canal/CanalSinkFullConfig.java | 2 +-
.../rdb/canal/CanalSinkIncrementConfig.java | 50 +
.../rdb/canal/CanalSourceConfig.java | 58 +-
.../rdb/canal/CanalSourceFullConfig.java | 2 +-
.../rdb/canal/CanalSourceIncrementConfig.java | 86 ++
.../eventmesh/common/utils/JsonUtils.java | 11 +
.../SqlBuilderLoadInterceptor.java | 16 +-
.../connector/CanalSinkCheckConnector.java | 406 ++++++++
.../sink/connector/CanalSinkConnector.java | 778 +---------------
.../connector/CanalSinkFullConnector.java | 43 +-
.../CanalSinkIncrementConnector.java | 865 ++++++++++++++++++
.../connector/canal/source/EntryParser.java | 18 +-
.../source/connector/CanalFullProducer.java | 8 +-
.../connector/CanalSourceCheckConnector.java | 186 ++++
.../connector/CanalSourceConnector.java | 319 +------
.../connector/CanalSourceFullConnector.java | 26 +-
.../CanalSourceIncrementConnector.java | 383 ++++++++
.../source/position/CanalFullPositionMgr.java | 4 +-
.../http/sink/HttpSinkConnector.java | 4 +-
.../http/sink/data/HttpConnectRecord.java | 40 +-
.../sink/handler/AbstractHttpSinkHandler.java | 4 +-
.../http/sink/handler/HttpSinkHandler.java | 10 +-
.../handler/impl/CommonHttpSinkHandler.java | 61 +-
.../impl/HttpSinkHandlerRetryWrapper.java | 10 +-
.../handler/impl/WebhookHttpSinkHandler.java | 13 +-
.../http/source/data/WebhookRequest.java | 4 +
.../source/protocol/impl/CommonProtocol.java | 31 +-
.../source/protocol/impl/GitHubProtocol.java | 2 +-
...esh.openconnect.api.ConnectorCreateService | 20 +
.../http/sink/HttpSinkConnectorTest.java | 4 +-
.../api/connector/SinkConnectorContext.java | 7 +
.../api/connector/SourceConnectorContext.java | 3 +
.../offsetmgmt/api/data/DefaultKeyValue.java | 5 +
eventmesh-runtime-v2/build.gradle | 1 +
.../runtime/connector/ConnectorRuntime.java | 101 +-
43 files changed, 2366 insertions(+), 1277 deletions(-)
rename {eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config => eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http}/HttpRetryConfig.java (95%)
rename {eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config => eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http}/HttpSinkConfig.java (94%)
rename {eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config => eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http}/HttpWebhookConfig.java (95%)
rename {eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config => eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http}/SinkConnectorConfig.java (84%)
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkIncrementConfig.java
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceIncrementConfig.java
create mode 100644 eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkCheckConnector.java
create mode 100644 eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkIncrementConnector.java
create mode 100644 eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceCheckConnector.java
create mode 100644 eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceIncrementConnector.java
create mode 100644 eventmesh-connectors/eventmesh-connector-http/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
index a8b469d8b7..76df629e69 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
@@ -37,6 +37,7 @@
import org.apache.eventmesh.common.remote.datasource.DataSource;
import org.apache.eventmesh.common.remote.datasource.DataSourceType;
import org.apache.eventmesh.common.remote.exception.ErrorCode;
+import org.apache.eventmesh.common.remote.job.JobType;
import org.apache.eventmesh.common.remote.request.CreateOrUpdateDataSourceReq;
import org.apache.eventmesh.common.utils.JsonUtils;
@@ -231,6 +232,8 @@ public JobDetail getJobDetail(String jobID) {
}
detail.setState(state);
detail.setTransportType(TransportType.getTransportType(job.getTransportType()));
+ detail.setJobType(JobType.fromIndex(job.getJobType()));
+ detail.setJobDesc(job.getJobDesc());
return detail;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigService.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigService.java
index 939c9d8d67..3f3f609a1f 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigService.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigService.java
@@ -131,7 +131,7 @@ public T getConfig(ConfigInfo configInfo) throws IOException {
} else {
filePath = path.startsWith(FILE_PATH_PREFIX) ? path.substring(FILE_PATH_PREFIX.length()) : this.configPath + path;
}
-
+ filePath = normalizeFilePath(filePath);
if (filePath.contains(".jar")) {
try (final InputStream inputStream = getClass().getResourceAsStream(Objects.requireNonNull(resourceUrl))) {
if (inputStream == null) {
@@ -152,6 +152,15 @@ public T getConfig(ConfigInfo configInfo) throws IOException {
return (T) object;
}
+ private String normalizeFilePath(String filePath) {
+ if (System.getProperty("os.name").toLowerCase().contains("win")) {
+ if (filePath.startsWith("/")) {
+ filePath = filePath.substring(1);
+ }
+ }
+ return filePath;
+ }
+
private void populateConfig(Object object, Class> clazz, Config config)
throws NoSuchFieldException, IOException, IllegalAccessException {
ConfigInfo configInfo = new ConfigInfo();
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpRetryConfig.java
similarity index 95%
rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java
rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpRetryConfig.java
index 08c3a323e7..319732a875 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpRetryConfig.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.http.sink.config;
+package org.apache.eventmesh.common.config.connector.http;
import lombok.Data;
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpSinkConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpSinkConfig.java
similarity index 94%
rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpSinkConfig.java
rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpSinkConfig.java
index 5997b90b7d..3c429f3355 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpSinkConfig.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpSinkConfig.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.http.sink.config;
+package org.apache.eventmesh.common.config.connector.http;
import org.apache.eventmesh.common.config.connector.SinkConfig;
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpWebhookConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpWebhookConfig.java
similarity index 95%
rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpWebhookConfig.java
rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpWebhookConfig.java
index f15bac4568..96b9e09826 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpWebhookConfig.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpWebhookConfig.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.http.sink.config;
+package org.apache.eventmesh.common.config.connector.http;
import lombok.Data;
diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java
similarity index 84%
rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java
rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java
index 9bb338cceb..ccebe5a998 100644
--- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java
@@ -15,9 +15,8 @@
* limitations under the License.
*/
-package org.apache.eventmesh.connector.http.sink.config;
+package org.apache.eventmesh.common.config.connector.http;
-import io.vertx.core.http.HttpClientOptions;
import lombok.Data;
@@ -29,19 +28,19 @@ public class SinkConnectorConfig {
private String[] urls;
// keepAlive, default true
- private boolean keepAlive = HttpClientOptions.DEFAULT_KEEP_ALIVE;
+ private boolean keepAlive = true;
// timeunit: ms, default 60000ms
- private int keepAliveTimeout = HttpClientOptions.DEFAULT_KEEP_ALIVE_TIMEOUT * 1000; // Keep units consistent
+ private int keepAliveTimeout = 60 * 1000; // Keep units consistent
// timeunit: ms, default 5000ms, recommended scope: 5000ms - 10000ms
private int connectionTimeout = 5000;
// timeunit: ms, default 5000ms
- private int idleTimeout;
+ private int idleTimeout = 5000;
// maximum number of HTTP/1 connections a client will pool, default 5
- private int maxConnectionPoolSize = HttpClientOptions.DEFAULT_MAX_POOL_SIZE;
+ private int maxConnectionPoolSize = 5;
// retry config
private HttpRetryConfig retryConfig = new HttpRetryConfig();
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
index b7f075e6d3..58d910bf2d 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
@@ -27,7 +27,7 @@ public class SourceConnectorConfig {
private String connectorName;
- private String path;
+ private String path = "/";
private int port;
@@ -51,11 +51,11 @@ public class SourceConnectorConfig {
private int batchSize = 10;
// protocol, default CloudEvent
- private String protocol = "CloudEvent";
+ private String protocol = "Common";
// extra config, e.g. GitHub secret
private Map extraConfig = new HashMap<>();
// data consistency enabled, default true
- private boolean dataConsistencyEnabled = true;
+ private boolean dataConsistencyEnabled = false;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
index 026f33f4fc..c535c7f52a 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java
@@ -18,7 +18,8 @@
package org.apache.eventmesh.common.config.connector.rdb.canal;
import org.apache.eventmesh.common.config.connector.SinkConfig;
-import org.apache.eventmesh.common.remote.job.SyncMode;
+
+import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -27,25 +28,7 @@
@EqualsAndHashCode(callSuper = true)
public class CanalSinkConfig extends SinkConfig {
- // batchSize
- private Integer batchSize = 50;
-
- // enable batch
- private Boolean useBatch = true;
-
- // sink thread size for single channel
- private Integer poolSize = 5;
-
- // sync mode: field/row
- private SyncMode syncMode;
-
- private boolean isGTIDMode = true;
-
- private boolean isMariaDB = true;
-
- // skip sink process exception
- private Boolean skipException = false;
-
- public SinkConnectorConfig sinkConnectorConfig;
+ // used to convert canal full/increment/check connector config
+ private Map sinkConfig;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java
index c2b881df6c..f1d78a65dc 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java
@@ -26,6 +26,6 @@
@Data
@EqualsAndHashCode(callSuper = true)
public class CanalSinkFullConfig extends SinkConfig {
- private SinkConnectorConfig sinkConfig;
+ private SinkConnectorConfig sinkConnectorConfig;
private String zeroDate;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkIncrementConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkIncrementConfig.java
new file mode 100644
index 0000000000..32112a769b
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkIncrementConfig.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.config.connector.rdb.canal;
+
+import org.apache.eventmesh.common.remote.job.SyncMode;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class CanalSinkIncrementConfig extends CanalSinkConfig {
+
+ // batchSize
+ private Integer batchSize = 50;
+
+ // enable batch
+ private Boolean useBatch = true;
+
+ // sink thread size for single channel
+ private Integer poolSize = 5;
+
+ // sync mode: field/row
+ private SyncMode syncMode;
+
+ private boolean isGTIDMode = true;
+
+ private boolean isMariaDB = true;
+
+ // skip sink process exception
+ private Boolean skipException = false;
+
+ public SinkConnectorConfig sinkConnectorConfig;
+
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
index 8331d32cb7..db17fbe75d 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceConfig.java
@@ -18,11 +18,8 @@
package org.apache.eventmesh.common.config.connector.rdb.canal;
import org.apache.eventmesh.common.config.connector.SourceConfig;
-import org.apache.eventmesh.common.remote.job.SyncConsistency;
-import org.apache.eventmesh.common.remote.job.SyncMode;
-import org.apache.eventmesh.common.remote.offset.RecordPosition;
-import java.util.List;
+import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -31,56 +28,7 @@
@EqualsAndHashCode(callSuper = true)
public class CanalSourceConfig extends SourceConfig {
- private String destination;
+ // used to convert canal full/increment/check connector config
+ private Map sourceConfig;
- private Long canalInstanceId;
-
- private String desc;
-
- private boolean ddlSync = true;
-
- private boolean filterTableError = false;
-
- private Long slaveId;
-
- private Short clientId;
-
- private String serverUUID;
-
- private boolean isMariaDB = true;
-
- private boolean isGTIDMode = true;
-
- private Integer batchSize = 10000;
-
- private Long batchTimeout = -1L;
-
- private String tableFilter;
-
- private String fieldFilter;
-
- private List recordPositions;
-
- // ================================= channel parameter
- // ================================
-
- // enable remedy
- private Boolean enableRemedy = false;
-
- // sync mode: field/row
- private SyncMode syncMode;
-
- // sync consistency
- private SyncConsistency syncConsistency;
-
- // ================================= system parameter
- // ================================
-
- // Column name of the bidirectional synchronization mark
- private String needSyncMarkTableColumnName = "needSync";
-
- // Column value of the bidirectional synchronization mark
- private String needSyncMarkTableColumnValue = "needSync";
-
- private SourceConnectorConfig sourceConnectorConfig;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java
index a2ab8ba31d..15398b303a 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java
@@ -28,7 +28,7 @@
@Data
@EqualsAndHashCode(callSuper = true)
public class CanalSourceFullConfig extends SourceConfig {
- private SourceConnectorConfig connectorConfig;
+ private SourceConnectorConfig sourceConnectorConfig;
private List startPosition;
private int parallel;
private int flushSize;
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceIncrementConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceIncrementConfig.java
new file mode 100644
index 0000000000..94fe007b5f
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceIncrementConfig.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.config.connector.rdb.canal;
+
+import org.apache.eventmesh.common.remote.job.SyncConsistency;
+import org.apache.eventmesh.common.remote.job.SyncMode;
+import org.apache.eventmesh.common.remote.offset.RecordPosition;
+
+import java.util.List;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class CanalSourceIncrementConfig extends CanalSourceConfig {
+
+ private String destination;
+
+ private Long canalInstanceId;
+
+ private String desc;
+
+ private boolean ddlSync = true;
+
+ private boolean filterTableError = false;
+
+ private Long slaveId;
+
+ private Short clientId;
+
+ private String serverUUID;
+
+ private boolean isMariaDB = true;
+
+ private boolean isGTIDMode = true;
+
+ private Integer batchSize = 10000;
+
+ private Long batchTimeout = -1L;
+
+ private String tableFilter;
+
+ private String fieldFilter;
+
+ private List recordPositions;
+
+ // ================================= channel parameter
+ // ================================
+
+ // enable remedy
+ private Boolean enableRemedy = false;
+
+ // sync mode: field/row
+ private SyncMode syncMode;
+
+ // sync consistency
+ private SyncConsistency syncConsistency;
+
+ // ================================= system parameter
+ // ================================
+
+ // Column name of the bidirectional synchronization mark
+ private String needSyncMarkTableColumnName = "needSync";
+
+ // Column value of the bidirectional synchronization mark
+ private String needSyncMarkTableColumnValue = "needSync";
+
+ private SourceConnectorConfig sourceConnectorConfig;
+
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
index 9e9cea304d..f2328541c4 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
@@ -58,6 +58,10 @@ public static T convertValue(Object fromValue, Class toValueType) {
return OBJECT_MAPPER.convertValue(fromValue, toValueType);
}
+ public static T convertValue(Object fromValue, TypeReference toValueTypeRef) {
+ return OBJECT_MAPPER.convertValue(fromValue, toValueTypeRef);
+ }
+
public static T mapToObject(Map map, Class beanClass) {
if (map == null) {
return null;
@@ -177,6 +181,13 @@ public static T parseTypeReferenceObject(String text, TypeReference typeR
}
}
+ public static T parseTypeReferenceObject(Object object, TypeReference typeReference) {
+ if (object == null) {
+ return null;
+ }
+ return convertValue(object, typeReference);
+ }
+
public static T parseTypeReferenceObject(byte[] text, TypeReference typeReference) {
try {
return OBJECT_MAPPER.readValue(text, typeReference);
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
index 0ad07577f9..7d83bd4f3f 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/interceptor/SqlBuilderLoadInterceptor.java
@@ -17,7 +17,7 @@
package org.apache.eventmesh.connector.canal.interceptor;
-import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkConfig;
+import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkIncrementConfig;
import org.apache.eventmesh.connector.canal.CanalConnectRecord;
import org.apache.eventmesh.connector.canal.dialect.DbDialect;
import org.apache.eventmesh.connector.canal.model.EventColumn;
@@ -40,7 +40,7 @@ public class SqlBuilderLoadInterceptor {
@Setter
private DbDialect dbDialect;
- public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord record) {
+ public boolean before(CanalSinkIncrementConfig sinkConfig, CanalConnectRecord record) {
// build sql
SqlTemplate sqlTemplate = dbDialect.getSqlTemplate();
EventType type = record.getEventType();
@@ -52,12 +52,12 @@ public boolean before(CanalSinkConfig sinkConfig, CanalConnectRecord record) {
if (type.isInsert()) {
sql = sqlTemplate.getMergeSql(schemaName,
- record.getTableName(),
- buildColumnNames(record.getKeys()),
- buildColumnNames(record.getColumns()),
- new String[] {},
- true,
- shardColumns);
+ record.getTableName(),
+ buildColumnNames(record.getKeys()),
+ buildColumnNames(record.getColumns()),
+ new String[] {},
+ true,
+ shardColumns);
} else if (type.isUpdate()) {
boolean existOldKeys = !CollectionUtils.isEmpty(record.getOldKeys());
boolean rowMode = sinkConfig.getSyncMode().isRow();
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkCheckConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkCheckConnector.java
new file mode 100644
index 0000000000..84e01ca85c
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkCheckConnector.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.canal.sink.connector;
+
+import org.apache.eventmesh.common.config.connector.Config;
+import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkFullConfig;
+import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.Constants;
+import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLColumnDef;
+import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef;
+import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.common.remote.offset.canal.CanalFullRecordOffset;
+import org.apache.eventmesh.connector.canal.DatabaseConnection;
+import org.apache.eventmesh.connector.canal.SqlUtils;
+import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr;
+import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.api.sink.Sink;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.math.BigDecimal;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.LockSupport;
+
+import com.alibaba.druid.pool.DruidPooledConnection;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class CanalSinkCheckConnector implements Sink, ConnectorCreateService {
+ private CanalSinkFullConfig config;
+ private RdbTableMgr tableMgr;
+ private final DateTimeFormatter dataTimePattern = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");
+
+ @Override
+ public void start() throws Exception {
+ tableMgr.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+
+ }
+
+ @Override
+ public Sink create() {
+ return new CanalSinkCheckConnector();
+ }
+
+ @Override
+ public Class extends Config> configClass() {
+ return CanalSinkFullConfig.class;
+ }
+
+ @Override
+ public void init(Config config) throws Exception {
+ this.config = (CanalSinkFullConfig) config;
+ init();
+ }
+
+ @Override
+ public void init(ConnectorContext connectorContext) throws Exception {
+ this.config = (CanalSinkFullConfig) ((SinkConnectorContext) connectorContext).getSinkConfig();
+ init();
+ }
+
+ private void init() {
+ if (config.getSinkConnectorConfig() == null) {
+ throw new EventMeshException(String.format("[%s] sink config is null", this.getClass()));
+ }
+ DatabaseConnection.sinkConfig = this.config.getSinkConnectorConfig();
+ DatabaseConnection.initSinkConnection();
+ DatabaseConnection.sinkDataSource.setDefaultAutoCommit(false);
+
+ tableMgr = new RdbTableMgr(this.config.getSinkConnectorConfig(), DatabaseConnection.sinkDataSource);
+ }
+
+ @Override
+ public void commit(ConnectRecord record) {
+
+ }
+
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public void onException(ConnectRecord record) {
+
+ }
+
+ @Override
+ public void put(List sinkRecords) {
+ if (sinkRecords == null || sinkRecords.isEmpty() || sinkRecords.get(0) == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] got sink records are none", this.getClass());
+ }
+ return;
+ }
+ ConnectRecord record = sinkRecords.get(0);
+ List