Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #5071] Enhancement for admin server and canal source/sink connector #5072

Merged
merged 5 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eventmesh-admin-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ dependencies {
implementation project(":eventmesh-common")
implementation project(":eventmesh-registry:eventmesh-registry-api")
implementation project(":eventmesh-registry:eventmesh-registry-nacos")
implementation project(':eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api')
implementation project(":eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api")
implementation "com.alibaba.nacos:nacos-client"
implementation("org.springframework.boot:spring-boot-starter-web") {
exclude group: "org.springframework.boot", module: "spring-boot-starter-tomcat"
Expand Down
8 changes: 6 additions & 2 deletions eventmesh-admin-server/conf/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ mybatis-plus:
configuration:
map-underscore-to-camel-case: false
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# http server port
server:
port: 8082
event-mesh:
admin-server:
serviceName: DEFAULT_GROUP@@em_adm_server
# grpc server port
port: 8081
adminServerList:
region1:
- http://localhost:8081
region2:
- http://localhost:8082
region2:
- http://localhost:8083
region: region1
7 changes: 4 additions & 3 deletions eventmesh-admin-server/conf/eventmesh.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
`dataType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`description` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`configuration` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`configurationClass` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`region` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
Expand Down Expand Up @@ -134,13 +135,13 @@ CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (

-- export table eventmesh.event_mesh_verify structure
CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
`id` int NOT NULL,
`id` int unsigned NOT NULL AUTO_INCREMENT,
`taskID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`recordID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`recordSig` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`connectorName` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`connectorName` varchar(200) COLLATE utf8mb4_general_ci DEFAULT NULL,
`connectorStage` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`position` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`position` text COLLATE utf8mb4_general_ci DEFAULT NULL,
`createTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<result property="dataType" column="dataType" jdbcType="VARCHAR"/>
<result property="description" column="description" jdbcType="VARCHAR"/>
<result property="configuration" column="configuration" jdbcType="VARCHAR"/>
<result property="configurationClass" column="configurationClass" jdbcType="VARCHAR"/>
<result property="region" column="region" jdbcType="VARCHAR"/>
<result property="createUid" column="createUid" jdbcType="VARCHAR"/>
<result property="updateUid" column="updateUid" jdbcType="VARCHAR"/>
Expand All @@ -37,7 +38,7 @@

<sql id="Base_Column_List">
id,dataType,description,
configuration,region,createUid,updateUid,
createTime,updateTime
configuration,configurationClass,region,
createUid,updateUid,createTime,updateTime
</sql>
</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class EventMeshDataSource implements Serializable {

private String configuration;

private String configurationClass;

private String region;

private String createUid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ public FetchJobResponse handler(FetchJobRequest request, Metadata metadata) {
}
response.setId(detail.getJobID());
JobConnectorConfig config = new JobConnectorConfig();
config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource()));
config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource().getConf()));
config.setSourceConnectorDesc(detail.getSourceConnectorDesc());
config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource()));
config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource().getConf()));
config.setSourceConnectorDesc(detail.getSinkConnectorDesc());
response.setConnectorConfig(config);
response.setTransportType(detail.getTransportType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@

@Service
public class DataSourceBizService {

@Autowired
private EventMeshDataSourceService dataSourceService;

public EventMeshDataSource createDataSource(CreateOrUpdateDataSourceReq dataSource) {
EventMeshDataSource entity = new EventMeshDataSource();
entity.setConfiguration(JsonUtils.toJSONString(dataSource.getConfig()));
entity.setConfigurationClass(dataSource.getConfigClass());
entity.setDataType(dataSource.getType().name());
entity.setCreateUid(dataSource.getOperator());
entity.setUpdateUid(dataSource.getOperator());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
import org.apache.eventmesh.admin.server.web.service.datasource.DataSourceBizService;
import org.apache.eventmesh.admin.server.web.service.position.PositionBizService;
import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.remote.TaskState;
import org.apache.eventmesh.common.remote.TransportType;
import org.apache.eventmesh.common.remote.datasource.DataSource;
Expand Down Expand Up @@ -114,6 +115,7 @@ public List<EventMeshJobInfo> createJobs(List<JobDetail> jobs) {
source.setRegion(job.getSourceDataSource().getRegion());
source.setDesc(job.getSourceConnectorDesc());
source.setConfig(job.getSourceDataSource().getConf());
source.setConfigClass(job.getSourceDataSource().getConfClazz().getName());
EventMeshDataSource createdSource = dataSourceBizService.createDataSource(source);
entity.setSourceData(createdSource.getId());

Expand All @@ -123,6 +125,7 @@ public List<EventMeshJobInfo> createJobs(List<JobDetail> jobs) {
sink.setRegion(job.getSinkDataSource().getRegion());
sink.setDesc(job.getSinkConnectorDesc());
sink.setConfig(job.getSinkDataSource().getConf());
sink.setConfigClass(job.getSinkDataSource().getConfClazz().getName());
EventMeshDataSource createdSink = dataSourceBizService.createDataSource(sink);
entity.setTargetData(createdSink.getId());

Expand All @@ -141,18 +144,22 @@ public JobDetail getJobDetail(String jobID) {
if (jobID == null) {
return null;
}
EventMeshJobInfo job = jobInfoService.getById(jobID);
EventMeshJobInfo job = jobInfoService.getOne(Wrappers.<EventMeshJobInfo>query().eq("jobID", jobID));
if (job == null) {
return null;
}
JobDetail detail = new JobDetail();
detail.setTaskID(job.getTaskID());
detail.setJobID(job.getJobID());
EventMeshDataSource source = dataSourceService.getById(job.getSourceData());
EventMeshDataSource target = dataSourceService.getById(job.getTargetData());
if (source != null) {
if (!StringUtils.isBlank(source.getConfiguration())) {
try {
detail.setSourceDataSource(JsonUtils.parseObject(source.getConfiguration(), DataSource.class));
DataSource sourceDataSource = new DataSource();
Class<?> configClass = Class.forName(source.getConfigurationClass());
sourceDataSource.setConf((Config) JsonUtils.parseObject(source.getConfiguration(), configClass));
detail.setSourceDataSource(sourceDataSource);
} catch (Exception e) {
log.warn("parse source config id [{}] fail", job.getSourceData(), e);
throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal source data source config");
Expand All @@ -168,7 +175,10 @@ public JobDetail getJobDetail(String jobID) {
if (target != null) {
if (!StringUtils.isBlank(target.getConfiguration())) {
try {
detail.setSinkDataSource(JsonUtils.parseObject(target.getConfiguration(), DataSource.class));
DataSource sinkDataSource = new DataSource();
Class<?> configClass = Class.forName(target.getConfigurationClass());
sinkDataSource.setConf((Config) JsonUtils.parseObject(target.getConfiguration(), configClass));
detail.setSinkDataSource(sinkDataSource);
} catch (Exception e) {
log.warn("parse sink config id [{}] fail", job.getSourceData(), e);
throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal target data sink config");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@
import org.apache.eventmesh.admin.server.web.db.service.EventMeshTaskInfoService;
import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService;
import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.remote.TaskState;
import org.apache.eventmesh.common.remote.datasource.DataSource;
import org.apache.eventmesh.common.remote.datasource.DataSourceType;
import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
import org.apache.eventmesh.common.utils.JsonUtils;

import org.apache.commons.lang3.StringUtils;

import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
Expand All @@ -40,6 +45,7 @@

@Service
public class TaskBizService {

@Autowired
private EventMeshTaskInfoService taskInfoService;

Expand Down Expand Up @@ -76,7 +82,12 @@ public String createTask(CreateTaskRequest req) {

String finalTaskID = taskID;
List<JobDetail> jobs = req.getJobs().stream().map(x -> {
JobDetail job = parse(x);
JobDetail job = null;
try {
job = parse(x);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
job.setTaskID(finalTaskID);
job.setCreateUid(req.getUid());
job.setUpdateUid(req.getUid());
Expand All @@ -95,14 +106,30 @@ public String createTask(CreateTaskRequest req) {
return finalTaskID;
}

private JobDetail parse(CreateTaskRequest.JobDetail src) {
private JobDetail parse(CreateTaskRequest.JobDetail src) throws ClassNotFoundException {
JobDetail dst = new JobDetail();
dst.setJobDesc(src.getJobDesc());
dst.setTransportType(src.getTransportType());
dst.setSourceConnectorDesc(src.getSourceConnectorDesc());
dst.setSourceDataSource(src.getSourceDataSource());
Map<String, Object> sourceDataMap = src.getSourceDataSource();
DataSource sourceDataSource = new DataSource();
sourceDataSource.setType(DataSourceType.fromString(sourceDataMap.get("type").toString()));
sourceDataSource.setDesc((String) sourceDataMap.get("desc"));
sourceDataSource.setConfClazz((Class<? extends Config>) Class.forName(sourceDataMap.get("confClazz").toString()));
sourceDataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(sourceDataMap.get("conf")), sourceDataSource.getConfClazz()));
sourceDataSource.setRegion((String) sourceDataMap.get("region"));
dst.setSourceDataSource(sourceDataSource);

dst.setSinkConnectorDesc(src.getSinkConnectorDesc());
dst.setSinkDataSource(src.getSinkDataSource());
Map<String, Object> sinkDataMap = src.getSinkDataSource();
DataSource sinkDataSource = new DataSource();
sinkDataSource.setType(DataSourceType.fromString(sinkDataMap.get("type").toString()));
sinkDataSource.setDesc((String) sinkDataMap.get("desc"));
sinkDataSource.setConfClazz((Class<? extends Config>) Class.forName(sinkDataMap.get("confClazz").toString()));
sinkDataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(sinkDataMap.get("conf")), sinkDataSource.getConfClazz()));
sinkDataSource.setRegion((String) sinkDataMap.get("region"));
dst.setSinkDataSource(sinkDataSource);

// full/increase/check
dst.setJobType(src.getJobType());
dst.setFromRegion(src.getFromRegion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,13 @@ public static DataSourceType getDataSourceType(Integer index) {
}
return TYPES[index];
}

public static DataSourceType fromString(String type) {
for (DataSourceType dataSourceType : DataSourceType.values()) {
if (dataSourceType.name().equalsIgnoreCase(type)) {
return dataSourceType;
}
}
throw new IllegalArgumentException("No enum constant for type: " + type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class CreateOrUpdateDataSourceReq extends BaseRemoteRequest {
private DataSourceType type;
private String desc;
private Config config;
private String configClass;
private String region;
private String operator;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.eventmesh.common.remote.request;

import org.apache.eventmesh.common.remote.TransportType;
import org.apache.eventmesh.common.remote.datasource.DataSource;
import org.apache.eventmesh.common.remote.job.JobType;

import java.util.List;
import java.util.Map;

import lombok.Data;

Expand Down Expand Up @@ -61,11 +61,11 @@ public static class JobDetail {
// full/increase/check
private JobType jobType;

private DataSource sourceDataSource;
private Map<String, Object> sourceDataSource;

private String sourceConnectorDesc;

private DataSource sinkDataSource;
private Map<String, Object> sinkDataSource;

private String sinkConnectorDesc;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
org.apache.eventmesh.common.remote.request.FetchJobRequest
org.apache.eventmesh.common.remote.response.FetchJobResponse
org.apache.eventmesh.common.remote.request.ReportPositionRequest
org.apache.eventmesh.common.remote.request.ReportVerifyRequest
org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest
org.apache.eventmesh.common.remote.request.FetchPositionRequest
org.apache.eventmesh.common.remote.response.FetchPositionResponse
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import org.apache.eventmesh.connector.canal.model.EventColumn;
import org.apache.eventmesh.connector.canal.model.EventType;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import lombok.Data;

@Data
public class CanalConnectRecord {
public class CanalConnectRecord implements Serializable {

private static final long serialVersionUID = 1L;

private String schemaName;

Expand Down
Loading
Loading