diff --git a/eventmesh-admin-server/build.gradle b/eventmesh-admin-server/build.gradle
index bdb6406da2..1fec2c7c52 100644
--- a/eventmesh-admin-server/build.gradle
+++ b/eventmesh-admin-server/build.gradle
@@ -20,7 +20,7 @@ dependencies {
implementation project(":eventmesh-common")
implementation project(":eventmesh-registry:eventmesh-registry-api")
implementation project(":eventmesh-registry:eventmesh-registry-nacos")
- implementation project(':eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api')
+ implementation project(":eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api")
implementation "com.alibaba.nacos:nacos-client"
implementation("org.springframework.boot:spring-boot-starter-web") {
exclude group: "org.springframework.boot", module: "spring-boot-starter-tomcat"
diff --git a/eventmesh-admin-server/conf/application.yaml b/eventmesh-admin-server/conf/application.yaml
index afbcd4a438..274196db60 100644
--- a/eventmesh-admin-server/conf/application.yaml
+++ b/eventmesh-admin-server/conf/application.yaml
@@ -26,13 +26,17 @@ mybatis-plus:
configuration:
map-underscore-to-camel-case: false
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
+# http server port
+server:
+ port: 8082
event-mesh:
admin-server:
serviceName: DEFAULT_GROUP@@em_adm_server
+ # grpc server port
port: 8081
adminServerList:
region1:
- - http://localhost:8081
- region2:
- http://localhost:8082
+ region2:
+ - http://localhost:8083
region: region1
\ No newline at end of file
diff --git a/eventmesh-admin-server/conf/eventmesh.sql b/eventmesh-admin-server/conf/eventmesh.sql
index 94edbb6fac..bdad02a8d5 100644
--- a/eventmesh-admin-server/conf/eventmesh.sql
+++ b/eventmesh-admin-server/conf/eventmesh.sql
@@ -33,6 +33,7 @@ CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
`dataType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`description` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`configuration` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
+ `configurationClass` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`region` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
@@ -134,13 +135,13 @@ CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
-- export table eventmesh.event_mesh_verify structure
CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
- `id` int NOT NULL,
+ `id` int unsigned NOT NULL AUTO_INCREMENT,
`taskID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`recordID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`recordSig` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
- `connectorName` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
+ `connectorName` varchar(200) COLLATE utf8mb4_general_ci DEFAULT NULL,
`connectorStage` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
- `position` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
+ `position` text COLLATE utf8mb4_general_ci DEFAULT NULL,
`createTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
diff --git a/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml
index d100e19033..50e6ad82cc 100644
--- a/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml
+++ b/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml
@@ -28,6 +28,7 @@
+
@@ -37,7 +38,7 @@
id,dataType,description,
- configuration,region,createUid,updateUid,
- createTime,updateTime
+ configuration,configurationClass,region,
+ createUid,updateUid,createTime,updateTime
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java
index 9d81366aa5..e6e328984c 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java
@@ -41,6 +41,8 @@ public class EventMeshDataSource implements Serializable {
private String configuration;
+ private String configurationClass;
+
private String region;
private String createUid;
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
index 8f159fa45b..b377bcddd8 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
@@ -53,9 +53,9 @@ public FetchJobResponse handler(FetchJobRequest request, Metadata metadata) {
}
response.setId(detail.getJobID());
JobConnectorConfig config = new JobConnectorConfig();
- config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource()));
+ config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource().getConf()));
config.setSourceConnectorDesc(detail.getSourceConnectorDesc());
- config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource()));
+ config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource().getConf()));
config.setSourceConnectorDesc(detail.getSinkConnectorDesc());
response.setConnectorConfig(config);
response.setTransportType(detail.getTransportType());
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java
index 433847a4cd..4d2d670100 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java
@@ -29,12 +29,14 @@
@Service
public class DataSourceBizService {
+
@Autowired
private EventMeshDataSourceService dataSourceService;
public EventMeshDataSource createDataSource(CreateOrUpdateDataSourceReq dataSource) {
EventMeshDataSource entity = new EventMeshDataSource();
entity.setConfiguration(JsonUtils.toJSONString(dataSource.getConfig()));
+ entity.setConfigurationClass(dataSource.getConfigClass());
entity.setDataType(dataSource.getType().name());
entity.setCreateUid(dataSource.getOperator());
entity.setUpdateUid(dataSource.getOperator());
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
index ea02658481..0657383e23 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
@@ -27,6 +27,7 @@
import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
import org.apache.eventmesh.admin.server.web.service.datasource.DataSourceBizService;
import org.apache.eventmesh.admin.server.web.service.position.PositionBizService;
+import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.remote.TaskState;
import org.apache.eventmesh.common.remote.TransportType;
import org.apache.eventmesh.common.remote.datasource.DataSource;
@@ -114,6 +115,7 @@ public List createJobs(List jobs) {
source.setRegion(job.getSourceDataSource().getRegion());
source.setDesc(job.getSourceConnectorDesc());
source.setConfig(job.getSourceDataSource().getConf());
+ source.setConfigClass(job.getSourceDataSource().getConfClazz().getName());
EventMeshDataSource createdSource = dataSourceBizService.createDataSource(source);
entity.setSourceData(createdSource.getId());
@@ -123,6 +125,7 @@ public List createJobs(List jobs) {
sink.setRegion(job.getSinkDataSource().getRegion());
sink.setDesc(job.getSinkConnectorDesc());
sink.setConfig(job.getSinkDataSource().getConf());
+ sink.setConfigClass(job.getSinkDataSource().getConfClazz().getName());
EventMeshDataSource createdSink = dataSourceBizService.createDataSource(sink);
entity.setTargetData(createdSink.getId());
@@ -141,18 +144,22 @@ public JobDetail getJobDetail(String jobID) {
if (jobID == null) {
return null;
}
- EventMeshJobInfo job = jobInfoService.getById(jobID);
+ EventMeshJobInfo job = jobInfoService.getOne(Wrappers.query().eq("jobID", jobID));
if (job == null) {
return null;
}
JobDetail detail = new JobDetail();
+ detail.setTaskID(job.getTaskID());
detail.setJobID(job.getJobID());
EventMeshDataSource source = dataSourceService.getById(job.getSourceData());
EventMeshDataSource target = dataSourceService.getById(job.getTargetData());
if (source != null) {
if (!StringUtils.isBlank(source.getConfiguration())) {
try {
- detail.setSourceDataSource(JsonUtils.parseObject(source.getConfiguration(), DataSource.class));
+ DataSource sourceDataSource = new DataSource();
+ Class> configClass = Class.forName(source.getConfigurationClass());
+ sourceDataSource.setConf((Config) JsonUtils.parseObject(source.getConfiguration(), configClass));
+ detail.setSourceDataSource(sourceDataSource);
} catch (Exception e) {
log.warn("parse source config id [{}] fail", job.getSourceData(), e);
throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal source data source config");
@@ -168,7 +175,10 @@ public JobDetail getJobDetail(String jobID) {
if (target != null) {
if (!StringUtils.isBlank(target.getConfiguration())) {
try {
- detail.setSinkDataSource(JsonUtils.parseObject(target.getConfiguration(), DataSource.class));
+ DataSource sinkDataSource = new DataSource();
+ Class> configClass = Class.forName(target.getConfigurationClass());
+ sinkDataSource.setConf((Config) JsonUtils.parseObject(target.getConfiguration(), configClass));
+ detail.setSinkDataSource(sinkDataSource);
} catch (Exception e) {
log.warn("parse sink config id [{}] fail", job.getSourceData(), e);
throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal target data sink config");
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
index f686456135..7089f9cf76 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java
@@ -22,12 +22,17 @@
import org.apache.eventmesh.admin.server.web.db.service.EventMeshTaskInfoService;
import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService;
+import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.remote.TaskState;
+import org.apache.eventmesh.common.remote.datasource.DataSource;
+import org.apache.eventmesh.common.remote.datasource.DataSourceType;
import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -40,6 +45,7 @@
@Service
public class TaskBizService {
+
@Autowired
private EventMeshTaskInfoService taskInfoService;
@@ -76,7 +82,12 @@ public String createTask(CreateTaskRequest req) {
String finalTaskID = taskID;
List jobs = req.getJobs().stream().map(x -> {
- JobDetail job = parse(x);
+ JobDetail job = null;
+ try {
+ job = parse(x);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
job.setTaskID(finalTaskID);
job.setCreateUid(req.getUid());
job.setUpdateUid(req.getUid());
@@ -95,14 +106,30 @@ public String createTask(CreateTaskRequest req) {
return finalTaskID;
}
- private JobDetail parse(CreateTaskRequest.JobDetail src) {
+ private JobDetail parse(CreateTaskRequest.JobDetail src) throws ClassNotFoundException {
JobDetail dst = new JobDetail();
dst.setJobDesc(src.getJobDesc());
dst.setTransportType(src.getTransportType());
dst.setSourceConnectorDesc(src.getSourceConnectorDesc());
- dst.setSourceDataSource(src.getSourceDataSource());
+ Map sourceDataMap = src.getSourceDataSource();
+ DataSource sourceDataSource = new DataSource();
+ sourceDataSource.setType(DataSourceType.fromString(sourceDataMap.get("type").toString()));
+ sourceDataSource.setDesc((String) sourceDataMap.get("desc"));
+ sourceDataSource.setConfClazz((Class extends Config>) Class.forName(sourceDataMap.get("confClazz").toString()));
+ sourceDataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(sourceDataMap.get("conf")), sourceDataSource.getConfClazz()));
+ sourceDataSource.setRegion((String) sourceDataMap.get("region"));
+ dst.setSourceDataSource(sourceDataSource);
+
dst.setSinkConnectorDesc(src.getSinkConnectorDesc());
- dst.setSinkDataSource(src.getSinkDataSource());
+ Map sinkDataMap = src.getSinkDataSource();
+ DataSource sinkDataSource = new DataSource();
+ sinkDataSource.setType(DataSourceType.fromString(sinkDataMap.get("type").toString()));
+ sinkDataSource.setDesc((String) sinkDataMap.get("desc"));
+ sinkDataSource.setConfClazz((Class extends Config>) Class.forName(sinkDataMap.get("confClazz").toString()));
+ sinkDataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(sinkDataMap.get("conf")), sinkDataSource.getConfClazz()));
+ sinkDataSource.setRegion((String) sinkDataMap.get("region"));
+ dst.setSinkDataSource(sinkDataSource);
+
// full/increase/check
dst.setJobType(src.getJobType());
dst.setFromRegion(src.getFromRegion());
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java
index 985f311b92..8c40971e7b 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java
@@ -61,4 +61,13 @@ public static DataSourceType getDataSourceType(Integer index) {
}
return TYPES[index];
}
+
+ public static DataSourceType fromString(String type) {
+ for (DataSourceType dataSourceType : DataSourceType.values()) {
+ if (dataSourceType.name().equalsIgnoreCase(type)) {
+ return dataSourceType;
+ }
+ }
+ throw new IllegalArgumentException("No enum constant for type: " + type);
+ }
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java
index fadfa68e75..f78349703a 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java
@@ -34,6 +34,7 @@ public class CreateOrUpdateDataSourceReq extends BaseRemoteRequest {
private DataSourceType type;
private String desc;
private Config config;
+ private String configClass;
private String region;
private String operator;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
index 47c45595af..c895b5c440 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java
@@ -18,10 +18,10 @@
package org.apache.eventmesh.common.remote.request;
import org.apache.eventmesh.common.remote.TransportType;
-import org.apache.eventmesh.common.remote.datasource.DataSource;
import org.apache.eventmesh.common.remote.job.JobType;
import java.util.List;
+import java.util.Map;
import lombok.Data;
@@ -61,11 +61,11 @@ public static class JobDetail {
// full/increase/check
private JobType jobType;
- private DataSource sourceDataSource;
+ private Map sourceDataSource;
private String sourceConnectorDesc;
- private DataSource sinkDataSource;
+ private Map sinkDataSource;
private String sinkConnectorDesc;
diff --git a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
index 2af95c7510..82d5c94dd3 100644
--- a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
+++ b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
@@ -16,6 +16,7 @@
org.apache.eventmesh.common.remote.request.FetchJobRequest
org.apache.eventmesh.common.remote.response.FetchJobResponse
org.apache.eventmesh.common.remote.request.ReportPositionRequest
+org.apache.eventmesh.common.remote.request.ReportVerifyRequest
org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest
org.apache.eventmesh.common.remote.request.FetchPositionRequest
org.apache.eventmesh.common.remote.response.FetchPositionResponse
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
index 36ecd158f6..6f112081e8 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/CanalConnectRecord.java
@@ -22,13 +22,16 @@
import org.apache.eventmesh.connector.canal.model.EventColumn;
import org.apache.eventmesh.connector.canal.model.EventType;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import lombok.Data;
@Data
-public class CanalConnectRecord {
+public class CanalConnectRecord implements Serializable {
+
+ private static final long serialVersionUID = 1L;
private String schemaName;
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
index 2ecb2384ac..49fb10dd35 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
@@ -44,6 +44,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.lang3.SerializationUtils;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@@ -163,7 +164,11 @@ public void stop() {
public void put(List sinkRecords) {
DbLoadContext context = new DbLoadContext();
for (ConnectRecord connectRecord : sinkRecords) {
- List canalConnectRecordList = (List) connectRecord.getData();
+ List canalConnectRecordList = new ArrayList<>();
+ // deep copy connectRecord data
+ for (CanalConnectRecord record : (List) connectRecord.getData()) {
+ canalConnectRecordList.add(SerializationUtils.clone(record));
+ }
canalConnectRecordList = filterRecord(canalConnectRecordList);
if (isDdlDatas(canalConnectRecordList)) {
doDdl(context, canalConnectRecordList, connectRecord);
@@ -175,7 +180,7 @@ public void put(List sinkRecords) {
DbLoadData loadData = new DbLoadData();
doBefore(canalConnectRecordList, loadData);
- doLoad(context, sinkConfig, loadData);
+ doLoad(context, sinkConfig, loadData, connectRecord);
}
@@ -259,7 +264,7 @@ private void doBefore(List canalConnectRecordList, final DbL
}
}
- private void doLoad(DbLoadContext context, CanalSinkConfig sinkConfig, DbLoadData loadData) {
+ private void doLoad(DbLoadContext context, CanalSinkConfig sinkConfig, DbLoadData loadData, ConnectRecord connectRecord) {
List> batchDatas = new ArrayList<>();
for (TableLoadData tableData : loadData.getTables()) {
if (useBatch) {
@@ -271,7 +276,7 @@ private void doLoad(DbLoadContext context, CanalSinkConfig sinkConfig, DbLoadDat
}
}
- doTwoPhase(context, sinkConfig, batchDatas, true);
+ doTwoPhase(context, sinkConfig, batchDatas, true, connectRecord);
batchDatas.clear();
@@ -289,7 +294,7 @@ private void doLoad(DbLoadContext context, CanalSinkConfig sinkConfig, DbLoadDat
}
}
- doTwoPhase(context, sinkConfig, batchDatas, true);
+ doTwoPhase(context, sinkConfig, batchDatas, true, connectRecord);
batchDatas.clear();
}
@@ -390,7 +395,8 @@ private boolean canBatch(CanalConnectRecord source, CanalConnectRecord target) {
&& StringUtils.equals(source.getSql(), target.getSql());
}
- private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig, List> totalRows, boolean canBatch) {
+ private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig, List> totalRows, boolean canBatch,
+ ConnectRecord connectRecord) {
List> results = new ArrayList<>();
for (List rows : totalRows) {
if (CollectionUtils.isEmpty(rows)) {
@@ -404,6 +410,9 @@ private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig, List<
Exception ex = null;
try {
ex = result.get();
+ if (ex == null) {
+ connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
+ }
} catch (Exception e) {
ex = e;
}
@@ -433,12 +442,14 @@ private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig, List<
log.warn("skip exception for data : {} , caused by {}",
retryRecord,
ExceptionUtils.getFullStackTrace(ex));
+ connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
}
} catch (Exception ex) {
// do skip
log.warn("skip exception for data : {} , caused by {}",
retryRecord,
ExceptionUtils.getFullStackTrace(ex));
+ connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
}
}
} else {
@@ -451,6 +462,7 @@ private void doTwoPhase(DbLoadContext context, CanalSinkConfig sinkConfig, List<
} catch (Exception ex) {
log.error("##load phase two failed!", ex);
log.error("sink connector will shutdown by " + ex.getMessage(), ex);
+ connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord, ex));
executor.shutdown();
System.exit(1);
}
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
index 5c4303588d..75572a5faf 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
@@ -67,7 +67,7 @@ public static Map> parse(CanalSourceConfig source
}
} else {
// if not gtid mode, need check weather the entry is loopback by specified column value
- needSync = checkNeedSync(sourceConfig, rowChange.getRowDatas(0));
+ needSync = checkNeedSync(sourceConfig, rowChange);
if (needSync) {
transactionDataBuffer.add(entry);
}
@@ -115,9 +115,16 @@ private static void parseRecordListWithEntryBuffer(CanalSourceConfig sourceConfi
}
}
- private static boolean checkNeedSync(CanalSourceConfig sourceConfig, RowData rowData) {
- Column markedColumn = getColumnIgnoreCase(rowData.getAfterColumnsList(),
- sourceConfig.getNeedSyncMarkTableColumnName());
+ private static boolean checkNeedSync(CanalSourceConfig sourceConfig, RowChange rowChange) {
+ Column markedColumn = null;
+ CanalEntry.EventType eventType = rowChange.getEventType();
+ if (eventType.equals(CanalEntry.EventType.DELETE) || eventType.equals(CanalEntry.EventType.UPDATE)) {
+ markedColumn = getColumnIgnoreCase(rowChange.getRowDatas(0).getBeforeColumnsList(),
+ sourceConfig.getNeedSyncMarkTableColumnName());
+ } else if (eventType.equals(CanalEntry.EventType.INSERT)) {
+ markedColumn = getColumnIgnoreCase(rowChange.getRowDatas(0).getAfterColumnsList(),
+ sourceConfig.getNeedSyncMarkTableColumnName());
+ }
if (markedColumn != null) {
return StringUtils.equalsIgnoreCase(markedColumn.getValue(),
sourceConfig.getNeedSyncMarkTableColumnValue());
diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
index 08270fc024..977661b134 100644
--- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
+++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
@@ -41,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Random;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
@@ -236,7 +237,7 @@ public void initialize(OffsetStorageConfig offsetStorageConfig) {
this.dataSourceType = offsetStorageConfig.getDataSourceType();
this.dataSinkType = offsetStorageConfig.getDataSinkType();
- this.adminServerAddr = offsetStorageConfig.getOffsetStorageAddr();
+ this.adminServerAddr = getRandomAdminServerAddr(offsetStorageConfig.getOffsetStorageAddr());
this.channel = ManagedChannelBuilder.forTarget(adminServerAddr)
.usePlaintext()
.build();
@@ -274,4 +275,14 @@ public void onCompleted() {
this.jobState = TaskState.RUNNING;
this.jobId = offsetStorageConfig.getExtensions().get("jobId");
}
+
+ private String getRandomAdminServerAddr(String adminServerAddrList) {
+ String[] addresses = adminServerAddrList.split(";");
+ if (addresses.length == 0) {
+ throw new IllegalArgumentException("Admin server address list is empty");
+ }
+ Random random = new Random();
+ int randomIndex = random.nextInt(addresses.length);
+ return addresses[randomIndex];
+ }
}
diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
index b3fc4346c4..0a41e18f7c 100644
--- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
+++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
@@ -26,21 +26,30 @@
import java.util.Set;
import java.util.UUID;
+import lombok.Getter;
+import lombok.Setter;
+
/**
* SourceDataEntries are generated by SourceTasks and passed to specific message queue to store.
*/
+@Getter
public class ConnectRecord {
private final String recordId = UUID.randomUUID().toString();
+ @Setter
private Long timestamp;
+ @Setter
private Object data;
+ @Setter
private RecordPosition position;
+ @Setter
private KeyValue extensions;
+ @Setter
private SendMessageCallback callback;
public ConnectRecord() {
@@ -63,42 +72,6 @@ public ConnectRecord(RecordPartition recordPartition, RecordOffset recordOffset,
this.data = data;
}
- public String getRecordId() {
- return recordId;
- }
-
- public Long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(Long timestamp) {
- this.timestamp = timestamp;
- }
-
- public Object getData() {
- return data;
- }
-
- public void setData(Object data) {
- this.data = data;
- }
-
- public KeyValue getExtensions() {
- return extensions;
- }
-
- public void setExtensions(KeyValue extensions) {
- this.extensions = extensions;
- }
-
- public RecordPosition getPosition() {
- return position;
- }
-
- public void setPosition(RecordPosition position) {
- this.position = position;
- }
-
public void addExtension(KeyValue extensions) {
if (this.extensions == null) {
this.extensions = new DefaultKeyValue();
@@ -137,14 +110,6 @@ public Object getExtensionObj(String key) {
return this.extensions.getObject(key);
}
- public SendMessageCallback getCallback() {
- return callback;
- }
-
- public void setCallback(SendMessageCallback callback) {
- this.callback = callback;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
index acea321e95..beb1d1eedc 100644
--- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
+++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java
@@ -76,6 +76,8 @@ public void init() throws Exception {
}
// use registry adminServiceAddr value replace config
runtimeInstanceConfig.setAdminServiceAddr(adminServiceAddr);
+ } else {
+ adminServiceAddr = runtimeInstanceConfig.getAdminServiceAddr();
}
runtimeFactory = initRuntimeFactory(runtimeInstanceConfig);
@@ -84,23 +86,25 @@ public void init() throws Exception {
}
public void start() throws Exception {
- if (!StringUtils.isBlank(adminServiceAddr) && registryService != null) {
- registryService.subscribe((event) -> {
- log.info("runtime receive registry event: {}", event);
- List registerServerInfoList = event.getInstances();
- Map registerServerInfoMap = new HashMap<>();
- for (RegisterServerInfo registerServerInfo : registerServerInfoList) {
- registerServerInfoMap.put(registerServerInfo.getAddress(), registerServerInfo);
- }
- if (!registerServerInfoMap.isEmpty()) {
- adminServerInfoMap = registerServerInfoMap;
- updateAdminServerAddr();
- }
- }, runtimeInstanceConfig.getAdminServiceName());
+ if (StringUtils.isBlank(adminServiceAddr)) {
+ throw new RuntimeException("admin server address is empty, please check");
+ } else {
+ if (registryService != null) {
+ registryService.subscribe((event) -> {
+ log.info("runtime receive registry event: {}", event);
+ List registerServerInfoList = event.getInstances();
+ Map registerServerInfoMap = new HashMap<>();
+ for (RegisterServerInfo registerServerInfo : registerServerInfoList) {
+ registerServerInfoMap.put(registerServerInfo.getAddress(), registerServerInfo);
+ }
+ if (!registerServerInfoMap.isEmpty()) {
+ adminServerInfoMap = registerServerInfoMap;
+ updateAdminServerAddr();
+ }
+ }, runtimeInstanceConfig.getAdminServiceName());
+ }
runtime.start();
isStarted = true;
- } else {
- throw new RuntimeException("admin server address is empty, please check");
}
}
diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
index 1e589ebd97..501f222fd3 100644
--- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
+++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
@@ -63,9 +63,12 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -132,6 +135,8 @@ public class ConnectorRuntime implements Runtime {
public static final String CALLBACK_EXTENSION = "callBackExtension";
+ private String adminServerAddr;
+
public ConnectorRuntime(RuntimeInstanceConfig runtimeInstanceConfig) {
this.runtimeInstanceConfig = runtimeInstanceConfig;
@@ -149,8 +154,9 @@ public void init() throws Exception {
}
private void initAdminService() {
+ adminServerAddr = getRandomAdminServerAddr(runtimeInstanceConfig.getAdminServiceAddr());
// create gRPC channel
- channel = ManagedChannelBuilder.forTarget(runtimeInstanceConfig.getAdminServiceAddr()).usePlaintext().build();
+ channel = ManagedChannelBuilder.forTarget(adminServerAddr).usePlaintext().build();
adminServiceStub = AdminServiceGrpc.newStub(channel).withWaitForReady();
@@ -176,6 +182,16 @@ public void onCompleted() {
requestObserver = adminServiceStub.invokeBiStream(responseObserver);
}
+ private String getRandomAdminServerAddr(String adminServerAddrList) {
+ String[] addresses = adminServerAddrList.split(";");
+ if (addresses.length == 0) {
+ throw new IllegalArgumentException("Admin server address list is empty");
+ }
+ Random random = new Random();
+ int randomIndex = random.nextInt(addresses.length);
+ return addresses[randomIndex];
+ }
+
private void initStorageService() {
// TODO: init producer & consumer
producer = StoragePluginFactory.getMeshMQProducer(runtimeInstanceConfig.getStoragePluginType());
@@ -202,25 +218,18 @@ private void initConnectorService() throws Exception {
connectorRuntimeConfig.setSinkConnectorDesc(jobResponse.getConnectorConfig().getSinkConnectorDesc());
connectorRuntimeConfig.setSinkConnectorConfig(jobResponse.getConnectorConfig().getSinkConnectorConfig());
- ConnectorCreateService> sourceConnectorCreateService =
- ConnectorPluginFactory.createConnector(connectorRuntimeConfig.getSourceConnectorType() + "-Source");
- sourceConnector = (Source) sourceConnectorCreateService.create();
-
- SourceConfig sourceConfig = (SourceConfig) ConfigUtil.parse(connectorRuntimeConfig.getSourceConnectorConfig(), sourceConnector.configClass());
- SourceConnectorContext sourceConnectorContext = new SourceConnectorContext();
- sourceConnectorContext.setSourceConfig(sourceConfig);
- sourceConnectorContext.setRuntimeConfig(connectorRuntimeConfig.getRuntimeConfig());
- sourceConnectorContext.setOffsetStorageReader(offsetStorageReader);
- if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) {
- sourceConnectorContext.setRecordPositionList(jobResponse.getPosition());
- }
-
// spi load offsetMgmtService
this.offsetManagement = new RecordOffsetManagement();
this.committableOffsets = RecordOffsetManagement.CommittableOffsets.EMPTY;
- OffsetStorageConfig offsetStorageConfig = sourceConfig.getOffsetStorageConfig();
+ OffsetStorageConfig offsetStorageConfig = new OffsetStorageConfig();
+ offsetStorageConfig.setOffsetStorageAddr(connectorRuntimeConfig.getRuntimeConfig().get("offsetStorageAddr").toString());
+ offsetStorageConfig.setOffsetStorageType(connectorRuntimeConfig.getRuntimeConfig().get("offsetStoragePluginType").toString());
offsetStorageConfig.setDataSourceType(jobResponse.getTransportType().getSrc());
offsetStorageConfig.setDataSinkType(jobResponse.getTransportType().getDst());
+ Map offsetStorageExtensions = new HashMap<>();
+ offsetStorageExtensions.put("jobId", connectorRuntimeConfig.getJobID());
+ offsetStorageConfig.setExtensions(offsetStorageExtensions);
+
this.offsetManagementService = Optional.ofNullable(offsetStorageConfig).map(OffsetStorageConfig::getOffsetStorageType)
.map(storageType -> EventMeshExtensionFactory.getExtension(OffsetManagementService.class, storageType))
.orElse(new DefaultOffsetManagementServiceImpl());
@@ -228,6 +237,18 @@ private void initConnectorService() throws Exception {
this.offsetStorageWriter = new OffsetStorageWriterImpl(offsetManagementService);
this.offsetStorageReader = new OffsetStorageReaderImpl(offsetManagementService);
+ ConnectorCreateService> sourceConnectorCreateService =
+ ConnectorPluginFactory.createConnector(connectorRuntimeConfig.getSourceConnectorType() + "-Source");
+ sourceConnector = (Source) sourceConnectorCreateService.create();
+
+ SourceConfig sourceConfig = (SourceConfig) ConfigUtil.parse(connectorRuntimeConfig.getSourceConnectorConfig(), sourceConnector.configClass());
+ SourceConnectorContext sourceConnectorContext = new SourceConnectorContext();
+ sourceConnectorContext.setSourceConfig(sourceConfig);
+ sourceConnectorContext.setRuntimeConfig(connectorRuntimeConfig.getRuntimeConfig());
+ sourceConnectorContext.setOffsetStorageReader(offsetStorageReader);
+ if (CollectionUtils.isNotEmpty(jobResponse.getPosition())) {
+ sourceConnectorContext.setRecordPositionList(jobResponse.getPosition());
+ }
sourceConnector.init(sourceConnectorContext);
ConnectorCreateService> sinkConnectorCreateService =
@@ -330,6 +351,9 @@ private void startSourceConnector() throws Exception {
// TODO: use producer pub record to storage replace below
if (connectorRecordList != null && !connectorRecordList.isEmpty()) {
for (ConnectRecord record : connectorRecordList) {
+
+ queue.put(record);
+
// if enabled incremental data reporting consistency check
if (connectorRuntimeConfig.enableIncrementalDataConsistencyCheck) {
reportVerifyRequest(record, connectorRuntimeConfig, ConnectorStage.SOURCE);
@@ -363,8 +387,6 @@ public void onException(SendExceptionContext sendExceptionContext) {
}
});
- queue.put(record);
-
offsetManagement.awaitAllMessages(5000, TimeUnit.MILLISECONDS);
// update & commit offset
updateCommittableOffsets();
diff --git a/eventmesh-runtime-v2/src/main/resources/connector.yaml b/eventmesh-runtime-v2/src/main/resources/connector.yaml
index 2e79e5cedc..3e407fa3e9 100644
--- a/eventmesh-runtime-v2/src/main/resources/connector.yaml
+++ b/eventmesh-runtime-v2/src/main/resources/connector.yaml
@@ -15,7 +15,9 @@
# limitations under the License.
#
-taskID: 1
-jobID: 1
+taskID: 9c18a0d2-7a61-482c-8275-34f8c2786cea
+jobID: a01fd5e1-d295-4b89-99bc-0ae23eb85acf
region: region1
runtimeConfig: # this used for connector runtime config
+ offsetStoragePluginType: admin
+ offsetStorageAddr: "127.0.0.1:8081;127.0.0.1:8081"
\ No newline at end of file
diff --git a/eventmesh-runtime-v2/src/main/resources/runtime.yaml b/eventmesh-runtime-v2/src/main/resources/runtime.yaml
index c5ffac9d92..9ac36f27b0 100644
--- a/eventmesh-runtime-v2/src/main/resources/runtime.yaml
+++ b/eventmesh-runtime-v2/src/main/resources/runtime.yaml
@@ -21,4 +21,4 @@ registryServerAddr: 127.0.0.1:8085
registryPluginType: nacos
storagePluginType: memory
adminServiceName: eventmesh-admin
-adminServiceAddr: "127.0.0.1:8085;127.0.0.1:8086"
+adminServiceAddr: "127.0.0.1:8081;127.0.0.1:8081"