diff --git a/eventmesh-admin-server/conf/application.yaml b/eventmesh-admin-server/conf/application.yaml index 54795057cb..afbcd4a438 100644 --- a/eventmesh-admin-server/conf/application.yaml +++ b/eventmesh-admin-server/conf/application.yaml @@ -28,5 +28,11 @@ mybatis-plus: log-impl: org.apache.ibatis.logging.stdout.StdOutImpl event-mesh: admin-server: - service-name: DEFAULT_GROUP@@em_adm_server - port: 8081 \ No newline at end of file + serviceName: DEFAULT_GROUP@@em_adm_server + port: 8081 + adminServerList: + region1: + - http://localhost:8081 + region2: + - http://localhost:8082 + region: region1 \ No newline at end of file diff --git a/eventmesh-admin-server/conf/eventmesh.sql b/eventmesh-admin-server/conf/eventmesh.sql index 82d5c53317..94edbb6fac 100644 --- a/eventmesh-admin-server/conf/eventmesh.sql +++ b/eventmesh-admin-server/conf/eventmesh.sql @@ -45,14 +45,15 @@ CREATE TABLE IF NOT EXISTS `event_mesh_data_source` ( CREATE TABLE IF NOT EXISTS `event_mesh_job_info` ( `id` int unsigned NOT NULL AUTO_INCREMENT, `jobID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, - `desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, + `jobDesc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, `taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', `transportType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', `sourceData` int NOT NULL DEFAULT '0', `targetData` int NOT NULL DEFAULT '0', - `state` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `jobState` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', `jobType` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', `fromRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `runningRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, `createUid` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, `updateUid` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -118,10 +119,11 @@ CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` ( CREATE TABLE IF NOT EXISTS `event_mesh_task_info` ( `id` int unsigned NOT NULL AUTO_INCREMENT, `taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, - `name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, - `desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, - `state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'taskstate', - `fromRegion` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `taskName` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `taskDesc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, + `taskState` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'taskstate', + `sourceRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `targetRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, `createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', `updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, diff --git a/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml index 02e8806680..a053d1c838 100644 --- a/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml +++ b/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml @@ -19,31 +19,33 @@ --> + PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" + "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> - - - - - - - - - - - - - - + + + + + + + + + + + + + + + - id,jobID,desc, + id,jobID,jobDesc, taskID,transportType,sourceData, - targetData,state,jobType, - fromRegion,createTime,updateTime + targetData,jobState,jobType, + fromRegion,runningRegion,createUid, + updateUid,createTime,updateTime diff --git a/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml index 05b1dc52a0..c3514fd945 100644 --- a/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml +++ b/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml @@ -26,10 +26,11 @@ - - - - + + + + + @@ -37,8 +38,8 @@ - id,taskID,name, - desc,state,fromRegion, + id,taskID,taskName, + taskDesc,taskState,sourceRegion,targetRegion, createUid,updateUid,createTime, updateTime diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java index 2162731e21..612d398078 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java @@ -17,6 +17,9 @@ package org.apache.eventmesh.admin.server; +import java.util.List; +import java.util.Map; + import org.springframework.boot.context.properties.ConfigurationProperties; import lombok.Getter; @@ -32,4 +35,6 @@ public class AdminServerProperties { private String configurationPath; private String configurationFile; private String serviceName; + private Map> adminServerList; + private String region; } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java index bd896d546c..a5daac881e 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java @@ -24,18 +24,21 @@ import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; +import com.alibaba.druid.support.json.JSONUtils; + @RestController @RequestMapping("/eventmesh/admin") public class HttpServer { @Autowired private TaskBizService taskService; - @RequestMapping("/createTask") - public ResponseEntity> createOrUpdateTask(@RequestBody CreateTaskRequest task) { + @RequestMapping(value = "/createTask", method = RequestMethod.POST) + public ResponseEntity createOrUpdateTask(@RequestBody CreateTaskRequest task) { String uuid = taskService.createTask(task); - return ResponseEntity.ok(Response.success(uuid)); + return ResponseEntity.ok(JSONUtils.toJSONString(Response.success(uuid))); } public boolean deleteTask(Long id) { diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java index 23db5f6c2b..a77eaaaca2 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java @@ -37,7 +37,7 @@ public class EventMeshJobInfo implements Serializable { private String jobID; - private String desc; + private String jobDesc; private String taskID; @@ -47,12 +47,16 @@ public class EventMeshJobInfo implements Serializable { private Integer targetData; - private String state; + private String jobState; private String jobType; + // job request from region private String fromRegion; + // job actually running region + private String runningRegion; + private String createUid; private String updateUid; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshTaskInfo.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshTaskInfo.java index 5d1b6648c9..2d40f4a082 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshTaskInfo.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshTaskInfo.java @@ -37,13 +37,15 @@ public class EventMeshTaskInfo implements Serializable { private String taskID; - private String name; + private String taskName; - private String desc; + private String taskDesc; - private String state; + private String taskState; - private String fromRegion; + private String sourceRegion; + + private String targetRegion; private String createUid; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoExtMapper.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoExtMapper.java index 7f46dcab41..c04c4e3748 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoExtMapper.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoExtMapper.java @@ -21,11 +21,12 @@ import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Mapper; -import org.apache.ibatis.annotations.Options; import org.apache.ibatis.annotations.Param; import java.util.List; +import org.springframework.transaction.annotation.Transactional; + import com.baomidou.mybatisplus.core.mapper.BaseMapper; /** @@ -33,9 +34,18 @@ */ @Mapper public interface EventMeshJobInfoExtMapper extends BaseMapper { - @Insert("insert into event_mesh_job_info(`taskID`,`state`,`jobType`) values" - + "(#{job.taskID},#{job.state},#{job.jobType})") - @Options(useGeneratedKeys = true, keyProperty = "jobID") + + @Insert("") + @Transactional(rollbackFor = Exception.class) int saveBatch(@Param("jobs") List jobInfoList); } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshVerifyServiceImpl.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshVerifyServiceImpl.java new file mode 100644 index 0000000000..5e49ba32ea --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshVerifyServiceImpl.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.db.service.impl; + +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify; +import org.apache.eventmesh.admin.server.web.db.mapper.EventMeshVerifyMapper; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshVerifyService; + +import org.springframework.stereotype.Service; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; + +/** + * event_mesh_verify + */ +@Service +public class EventMeshVerifyServiceImpl extends ServiceImpl + implements EventMeshVerifyService { + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/JobDetail.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/JobDetail.java index c47b284483..0e2fa64878 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/JobDetail.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/JobDetail.java @@ -34,7 +34,7 @@ public class JobDetail { private String jobID; - private String desc; + private String jobDesc; private String taskID; @@ -50,7 +50,11 @@ public class JobDetail { private String updateUid; - private String region; + // job request from region + private String fromRegion; + + // job actually running region + private String runningRegion; private DataSource sourceDataSource; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java index 9affa10e60..ea02658481 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.admin.server.web.service.job; +import org.apache.eventmesh.admin.server.AdminServerProperties; import org.apache.eventmesh.admin.server.AdminServerRuntimeException; import org.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; @@ -70,12 +71,15 @@ public class JobInfoBizService { @Autowired private PositionBizService positionBizService; + @Autowired + private AdminServerProperties properties; + public boolean updateJobState(String jobID, TaskState state) { if (jobID == null || state == null) { return false; } EventMeshJobInfo jobInfo = new EventMeshJobInfo(); - jobInfo.setState(state.name()); + jobInfo.setJobState(state.name()); return jobInfoService.update(jobInfo, Wrappers.update().eq("jobID", jobID).ne("state", TaskState.DELETE.name())); } @@ -86,34 +90,40 @@ public List createJobs(List jobs) { return null; } List entityList = new LinkedList<>(); + for (JobDetail job : jobs) { + // if running region not equal with admin region continue + if (!job.getRunningRegion().equals(properties.getRegion())) { + continue; + } EventMeshJobInfo entity = new EventMeshJobInfo(); - entity.setState(TaskState.INIT.name()); + entity.setJobState(TaskState.INIT.name()); entity.setTaskID(job.getTaskID()); entity.setJobType(job.getJobType().name()); - entity.setDesc(job.getDesc()); + entity.setJobDesc(job.getJobDesc()); String jobID = UUID.randomUUID().toString(); entity.setJobID(jobID); entity.setTransportType(job.getTransportType().name()); entity.setCreateUid(job.getCreateUid()); entity.setUpdateUid(job.getUpdateUid()); - entity.setFromRegion(job.getRegion()); + entity.setFromRegion(job.getFromRegion()); + entity.setRunningRegion(job.getRunningRegion()); CreateOrUpdateDataSourceReq source = new CreateOrUpdateDataSourceReq(); source.setType(job.getTransportType().getSrc()); source.setOperator(job.getCreateUid()); - source.setRegion(job.getRegion()); + source.setRegion(job.getSourceDataSource().getRegion()); source.setDesc(job.getSourceConnectorDesc()); - source.setConfig(job.getSourceDataSource()); + source.setConfig(job.getSourceDataSource().getConf()); EventMeshDataSource createdSource = dataSourceBizService.createDataSource(source); entity.setSourceData(createdSource.getId()); CreateOrUpdateDataSourceReq sink = new CreateOrUpdateDataSourceReq(); sink.setType(job.getTransportType().getDst()); sink.setOperator(job.getCreateUid()); - sink.setRegion(job.getRegion()); + sink.setRegion(job.getSinkDataSource().getRegion()); sink.setDesc(job.getSinkConnectorDesc()); - sink.setConfig(job.getSinkDataSource()); - EventMeshDataSource createdSink = dataSourceBizService.createDataSource(source); + sink.setConfig(job.getSinkDataSource().getConf()); + EventMeshDataSource createdSink = dataSourceBizService.createDataSource(sink); entity.setTargetData(createdSink.getId()); entityList.add(entity); @@ -167,7 +177,7 @@ public JobDetail getJobDetail(String jobID) { detail.setSinkConnectorDesc(target.getDescription()); } - TaskState state = TaskState.fromIndex(job.getState()); + TaskState state = TaskState.fromIndex(job.getJobState()); if (state == null) { throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal job state in db"); } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java index b4fdc57af0..f686456135 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.admin.server.web.service.task; +import org.apache.eventmesh.admin.server.AdminServerProperties; import org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo; import org.apache.eventmesh.admin.server.web.db.service.EventMeshTaskInfoService; import org.apache.eventmesh.admin.server.web.pojo.JobDetail; @@ -24,13 +25,18 @@ import org.apache.eventmesh.common.remote.TaskState; import org.apache.eventmesh.common.remote.request.CreateTaskRequest; +import org.apache.commons.lang3.StringUtils; + import java.util.List; +import java.util.Random; import java.util.UUID; import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import org.springframework.web.client.RestTemplate; @Service public class TaskBizService { @@ -40,38 +46,67 @@ public class TaskBizService { @Autowired private JobInfoBizService jobInfoService; + @Autowired + private AdminServerProperties properties; + @Transactional public String createTask(CreateTaskRequest req) { - String taskID = UUID.randomUUID().toString(); + String taskID = req.getTaskId(); + if (StringUtils.isEmpty(taskID)) { + taskID = UUID.randomUUID().toString(); + req.setTaskId(taskID); + } + + String targetRegion = req.getTargetRegion(); + // not from other admin && target not equals with self region + if (!req.isFlag() && !StringUtils.equals(properties.getRegion(), targetRegion)) { + List adminServerList = properties.getAdminServerList().get(targetRegion); + if (adminServerList == null || adminServerList.isEmpty()) { + throw new RuntimeException("No admin server available for region: " + targetRegion); + } + String targetUrl = adminServerList.get(new Random().nextInt(adminServerList.size())) + "/eventmesh/admin/createTask"; + + RestTemplate restTemplate = new RestTemplate(); + req.setFlag(true); + ResponseEntity response = restTemplate.postForEntity(targetUrl, req, String.class); + if (!response.getStatusCode().is2xxSuccessful()) { + throw new RuntimeException("Failed to create task on admin server: " + targetUrl); + } + } + + String finalTaskID = taskID; List jobs = req.getJobs().stream().map(x -> { JobDetail job = parse(x); - job.setTaskID(taskID); - job.setRegion(req.getRegion()); + job.setTaskID(finalTaskID); job.setCreateUid(req.getUid()); job.setUpdateUid(req.getUid()); return job; }).collect(Collectors.toList()); jobInfoService.createJobs(jobs); EventMeshTaskInfo taskInfo = new EventMeshTaskInfo(); - taskInfo.setTaskID(taskID); - taskInfo.setName(req.getName()); - taskInfo.setDesc(req.getDesc()); - taskInfo.setState(TaskState.INIT.name()); + taskInfo.setTaskID(finalTaskID); + taskInfo.setTaskName(req.getTaskName()); + taskInfo.setTaskDesc(req.getTaskDesc()); + taskInfo.setTaskState(TaskState.INIT.name()); taskInfo.setCreateUid(req.getUid()); - taskInfo.setFromRegion(req.getRegion()); + taskInfo.setSourceRegion(req.getSourceRegion()); + taskInfo.setTargetRegion(req.getTargetRegion()); taskInfoService.save(taskInfo); - return taskID; + return finalTaskID; } private JobDetail parse(CreateTaskRequest.JobDetail src) { JobDetail dst = new JobDetail(); - dst.setDesc(src.getDesc()); + dst.setJobDesc(src.getJobDesc()); dst.setTransportType(src.getTransportType()); dst.setSourceConnectorDesc(src.getSourceConnectorDesc()); dst.setSourceDataSource(src.getSourceDataSource()); dst.setSinkConnectorDesc(src.getSinkConnectorDesc()); dst.setSinkDataSource(src.getSinkDataSource()); + // full/increase/check dst.setJobType(src.getJobType()); + dst.setFromRegion(src.getFromRegion()); + dst.setRunningRegion(src.getRunningRegion()); return dst; } } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java index 95a88a23fa..82e7bc021d 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java @@ -30,8 +30,12 @@ public enum TransportType { REDIS_REDIS(DataSourceType.REDIS, DataSourceType.REDIS), ROCKETMQ_ROCKETMQ(DataSourceType.ROCKETMQ, DataSourceType.ROCKETMQ), MYSQL_HTTP(DataSourceType.MYSQL, DataSourceType.HTTP), + ROCKETMQ_HTTP(DataSourceType.ROCKETMQ, DataSourceType.HTTP), HTTP_MYSQL(DataSourceType.HTTP, DataSourceType.MYSQL), - REDIS_MQ(DataSourceType.REDIS, DataSourceType.ROCKETMQ); + HTTP_REDIS(DataSourceType.HTTP, DataSourceType.REDIS), + HTTP_ROCKETMQ(DataSourceType.HTTP, DataSourceType.ROCKETMQ), + REDIS_MQ(DataSourceType.REDIS, DataSourceType.ROCKETMQ), + ; private static final Map INDEX_TYPES = new HashMap<>(); private static final TransportType[] TYPES = TransportType.values(); private static final String SEPARATOR = "@"; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSource.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSource.java index 7af3812f24..afda984805 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSource.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSource.java @@ -17,27 +17,30 @@ package org.apache.eventmesh.common.remote.datasource; +import org.apache.eventmesh.common.config.connector.Config; +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkConfig; +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceConfig; + import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import lombok.Getter; +import lombok.Data; -@Getter +@Data public class DataSource { - private final DataSourceType type; + + private DataSourceType type; + private String desc; @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS) @JsonSubTypes({ - @JsonSubTypes.Type(value = MySqlIncDataSourceSourceConf.class, name = "MySqlIncDataSourceSourceConf") + @JsonSubTypes.Type(value = CanalSourceConfig.class, name = "CanalSourceConfig"), + @JsonSubTypes.Type(value = CanalSinkConfig.class, name = "CanalSinkConfig") }) - private final DataSourceConf conf; - private final Class confClazz; + private Config conf; - public DataSource(DataSourceType type, DataSourceConf conf) { - this.type = type; - this.conf = conf; - this.confClazz = conf.getConfClass(); - } + private Class confClazz; + private String region; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/MySqlIncDataSourceSourceConf.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/MySqlIncDataSourceSourceConf.java deleted file mode 100644 index f8c825e963..0000000000 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/MySqlIncDataSourceSourceConf.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eventmesh.common.remote.datasource; - -import org.apache.eventmesh.common.config.connector.rdb.canal.SourceConnectorConfig; -import org.apache.eventmesh.common.remote.job.SyncConsistency; -import org.apache.eventmesh.common.remote.job.SyncMode; -import org.apache.eventmesh.common.remote.offset.RecordPosition; - -import java.util.List; - -public class MySqlIncDataSourceSourceConf extends DataSourceConf { - @Override - public Class getConfClass() { - return MySqlIncDataSourceSourceConf.class; - } - - private String destination; - - private Long canalInstanceId; - - private String desc; - - private boolean ddlSync = true; - - private boolean filterTableError = false; - - private Long slaveId; - - private Short clientId; - - private String serverUUID; - - private boolean isMariaDB = true; - - private boolean isGTIDMode = true; - - private Integer batchSize = 10000; - - private Long batchTimeout = -1L; - - private String tableFilter; - - private String fieldFilter; - - private List recordPositions; - - // ================================= channel parameter - // ================================ - - // enable remedy - private Boolean enableRemedy = false; - - // sync mode: field/row - private SyncMode syncMode; - - // sync consistency - private SyncConsistency syncConsistency; - - // ================================= system parameter - // ================================ - - // Column name of the bidirectional synchronization mark - private String needSyncMarkTableColumnName = "needSync"; - - // Column value of the bidirectional synchronization mark - private String needSyncMarkTableColumnValue = "needSync"; - - private SourceConnectorConfig sourceConnectorConfig; -} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java index 4ecf9b4527..fadfa68e75 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java @@ -17,7 +17,7 @@ package org.apache.eventmesh.common.remote.request; -import org.apache.eventmesh.common.remote.datasource.DataSource; +import org.apache.eventmesh.common.config.connector.Config; import org.apache.eventmesh.common.remote.datasource.DataSourceType; import lombok.Data; @@ -29,10 +29,11 @@ @Data @EqualsAndHashCode(callSuper = true) public class CreateOrUpdateDataSourceReq extends BaseRemoteRequest { + private Integer id; private DataSourceType type; private String desc; - private DataSource config; + private Config config; private String region; private String operator; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java index ce24e03416..47c45595af 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java @@ -30,16 +30,35 @@ */ @Data public class CreateTaskRequest { - private String name; - private String desc; + + private String taskId; + + // task name + private String taskName; + + // task description + private String taskDesc; + + // task owner or updater private String uid; + private List jobs; - private String region; + + // task source region + private String sourceRegion; + + // task target region + private String targetRegion; + + // mark request send by other region admin, default is false + private boolean flag = false; @Data public static class JobDetail { - private String desc; + private String jobDesc; + + // full/increase/check private JobType jobType; private DataSource sourceDataSource; @@ -51,5 +70,11 @@ public static class JobDetail { private String sinkConnectorDesc; private TransportType transportType; + + // job request from region + private String fromRegion; + + // job actually running region + private String runningRegion; } } diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/RuntimeInstanceConfig.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/RuntimeInstanceConfig.java index 7171b3fc27..caa5330fe3 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/RuntimeInstanceConfig.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/RuntimeInstanceConfig.java @@ -28,6 +28,8 @@ @Config(path = "classPath://runtime.yaml") public class RuntimeInstanceConfig { + private boolean registryEnabled; + private String registryServerAddr; private String registryPluginType; @@ -36,7 +38,7 @@ public class RuntimeInstanceConfig { private String adminServiceName; - private String adminServerAddr; + private String adminServiceAddr; private ComponentType componentType; diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java index 0fade897f6..acea321e95 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java @@ -41,11 +41,11 @@ @Slf4j public class RuntimeInstance { - private String adminServerAddr = "127.0.0.1:8081"; + private String adminServiceAddr; private Map adminServerInfoMap = new HashMap<>(); - private final RegistryService registryService; + private RegistryService registryService; private Runtime runtime; @@ -57,29 +57,34 @@ public class RuntimeInstance { public RuntimeInstance(RuntimeInstanceConfig runtimeInstanceConfig) { this.runtimeInstanceConfig = runtimeInstanceConfig; - this.registryService = RegistryFactory.getInstance(runtimeInstanceConfig.getRegistryPluginType()); + if (runtimeInstanceConfig.isRegistryEnabled()) { + this.registryService = RegistryFactory.getInstance(runtimeInstanceConfig.getRegistryPluginType()); + } } public void init() throws Exception { - registryService.init(); - QueryInstances queryInstances = new QueryInstances(); - queryInstances.setServiceName(runtimeInstanceConfig.getAdminServiceName()); - queryInstances.setHealth(true); - List adminServerRegisterInfoList = registryService.selectInstances(queryInstances); - if (!adminServerRegisterInfoList.isEmpty()) { - adminServerAddr = getRandomAdminServerAddr(adminServerRegisterInfoList); - } else { - throw new RuntimeException("admin server address is empty, please check"); + if (registryService != null) { + registryService.init(); + QueryInstances queryInstances = new QueryInstances(); + queryInstances.setServiceName(runtimeInstanceConfig.getAdminServiceName()); + queryInstances.setHealth(true); + List adminServerRegisterInfoList = registryService.selectInstances(queryInstances); + if (!adminServerRegisterInfoList.isEmpty()) { + adminServiceAddr = getRandomAdminServerAddr(adminServerRegisterInfoList); + } else { + throw new RuntimeException("admin server address is empty, please check"); + } + // use registry adminServiceAddr value replace config + runtimeInstanceConfig.setAdminServiceAddr(adminServiceAddr); } - runtimeInstanceConfig.setAdminServerAddr(adminServerAddr); + runtimeFactory = initRuntimeFactory(runtimeInstanceConfig); runtime = runtimeFactory.createRuntime(runtimeInstanceConfig); runtime.init(); } public void start() throws Exception { - if (!StringUtils.isBlank(adminServerAddr)) { - + if (!StringUtils.isBlank(adminServiceAddr) && registryService != null) { registryService.subscribe((event) -> { log.info("runtime receive registry event: {}", event); List registerServerInfoList = event.getInstances(); @@ -91,7 +96,6 @@ public void start() throws Exception { adminServerInfoMap = registerServerInfoMap; updateAdminServerAddr(); } - }, runtimeInstanceConfig.getAdminServiceName()); runtime.start(); isStarted = true; @@ -106,14 +110,14 @@ public void shutdown() throws Exception { private void updateAdminServerAddr() throws Exception { if (isStarted) { - if (!adminServerInfoMap.containsKey(adminServerAddr)) { - adminServerAddr = getRandomAdminServerAddr(adminServerInfoMap); - log.info("admin server address changed to: {}", adminServerAddr); + if (!adminServerInfoMap.containsKey(adminServiceAddr)) { + adminServiceAddr = getRandomAdminServerAddr(adminServerInfoMap); + log.info("admin server address changed to: {}", adminServiceAddr); shutdown(); start(); } } else { - adminServerAddr = getRandomAdminServerAddr(adminServerInfoMap); + adminServiceAddr = getRandomAdminServerAddr(adminServerInfoMap); } } diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java index 6cd0452b83..1e589ebd97 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java @@ -150,7 +150,7 @@ public void init() throws Exception { private void initAdminService() { // create gRPC channel - channel = ManagedChannelBuilder.forTarget(runtimeInstanceConfig.getAdminServerAddr()).usePlaintext().build(); + channel = ManagedChannelBuilder.forTarget(runtimeInstanceConfig.getAdminServiceAddr()).usePlaintext().build(); adminServiceStub = AdminServiceGrpc.newStub(channel).withWaitForReady(); diff --git a/eventmesh-runtime-v2/src/main/resources/runtime.yaml b/eventmesh-runtime-v2/src/main/resources/runtime.yaml index 44c5f6f91f..c5ffac9d92 100644 --- a/eventmesh-runtime-v2/src/main/resources/runtime.yaml +++ b/eventmesh-runtime-v2/src/main/resources/runtime.yaml @@ -16,7 +16,9 @@ # componentType: CONNECTOR +registryEnabled: false registryServerAddr: 127.0.0.1:8085 registryPluginType: nacos storagePluginType: memory adminServiceName: eventmesh-admin +adminServiceAddr: "127.0.0.1:8085;127.0.0.1:8086"