From 37595613d1d8ed729bdea46633ac60fc9f9bb6be Mon Sep 17 00:00:00 2001 From: sodaRyCN <757083350@qq.com> Date: Tue, 30 Jul 2024 19:44:20 +0800 Subject: [PATCH] more admin --- eventmesh-admin-server/conf/eventmesh.sql | 134 +++++++++++++++++- .../conf/mapper/EventMeshDataSourceMapper.xml | 9 +- .../conf/mapper/EventMeshJobInfoMapper.xml | 16 ++- .../conf/mapper/EventMeshTaskInfoMapper.xml | 19 +-- .../conf/mapper/EventMeshVerifyMapper.xml | 42 ++++++ .../admin/server/web/HttpServer.java | 20 +++ .../eventmesh/admin/server/web/Response.java | 43 +++--- .../web/db/entity/EventMeshDataSource.java | 8 +- .../web/db/entity/EventMeshJobInfo.java | 40 +++++- .../web/db/entity/EventMeshTaskInfo.java | 40 ++++-- .../server/web/db/entity/EventMeshVerify.java | 53 +++++++ .../web/db/mapper/EventMeshJobInfoMapper.java | 3 +- .../db/mapper/EventMeshTaskInfoMapper.java | 4 + .../web/db/mapper/EventMeshVerifyMapper.java | 37 +++++ .../db/service/EventMeshJobInfoService.java | 35 +++-- .../db/service/EventMeshVerifyService.java | 29 ++++ .../impl/EventMeshJobInfoServiceImpl.java | 46 +++--- .../handler/impl/FetchJobRequestHandler.java | 35 +++-- .../handler/impl/FetchPositionHandler.java | 9 +- .../handler/impl/ReportHeartBeatHandler.java | 28 ++-- .../handler/impl/ReportPositionHandler.java | 47 +++--- .../web/handler/impl/ReportVerifyHandler.java | 49 +++++++ .../admin/server/web/pojo/JobDetail.java | 29 ++-- .../server/web/service/AdminGrpcServer.java | 17 +-- .../admin/server/web/service/AdminServer.java | 38 +---- .../datasource/DataSourceBizService.java | 48 +++++++ ...e.java => RuntimeHeartbeatBizService.java} | 2 +- ...BizService.java => JobInfoBizService.java} | 96 ++++++++----- ...izService.java => PositionBizService.java} | 6 +- .../position/impl/MysqlPositionHandler.java | 2 +- .../web/service/task/TaskBizService.java | 46 +++++- .../web/service/verify/VerifyBizService.java | 42 ++++++ .../{job/JobState.java => TaskState.java} | 17 ++- .../remote/{task => }/TransportType.java | 11 +- .../common/remote/datasource/DataSource.java | 39 +++-- .../remote/datasource/DataSourceClassify.java | 3 +- .../remote/datasource/DataSourceConf.java | 28 +--- .../datasource/DataSourceDriverType.java | 3 +- .../remote/datasource/DataSourceType.java | 36 +++-- .../MySqlIncDataSourceSourceConf.java | 85 +++++++++++ .../common/remote/job/JobConnectorConfig.java | 4 +- .../remote/request/BaseRemoteRequest.java | 2 +- ....java => CreateOrUpdateDataSourceReq.java} | 23 +-- .../remote/request/CreateTaskRequest.java | 55 +++++++ .../remote/request/ReportPositionRequest.java | 6 +- .../remote/request/ReportVerifyRequest.java | 2 + .../remote/response/BaseRemoteResponse.java | 2 - ...kResponse.java => CreateTaskResponse.java} | 6 +- .../remote/response/FetchJobResponse.java | 18 +-- ...{FailResponse.java => SimpleResponse.java} | 19 ++- .../common/remote/task/TaskState.java | 32 ----- .../eventmesh/common/utils/JsonUtils.java | 8 ++ .../offsetmgmt/admin/AdminOffsetService.java | 6 +- .../runtime/connector/ConnectorRuntime.java | 8 -- 54 files changed, 1068 insertions(+), 417 deletions(-) create mode 100644 eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml create mode 100644 eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java create mode 100644 eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshVerifyMapper.java create mode 100644 eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshVerifyService.java create mode 100644 eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java create mode 100644 eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java rename eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/heatbeat/{EventMeshRuntimeHeartbeatBizService.java => RuntimeHeartbeatBizService.java} (98%) rename eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/{EventMeshJobInfoBizService.java => JobInfoBizService.java} (55%) rename eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/{EventMeshPositionBizService.java => PositionBizService.java} (94%) create mode 100644 eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java rename eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/{job/JobState.java => TaskState.java} (75%) rename eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/{task => }/TransportType.java (84%) rename eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/Admin.java => eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceConf.java (56%) create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/MySqlIncDataSourceSourceConf.java rename eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/{CreateOrUpdateTaskReq.java => CreateOrUpdateDataSourceReq.java} (65%) create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java rename eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/{EmptyAckResponse.java => CreateTaskResponse.java} (88%) rename eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/{FailResponse.java => SimpleResponse.java} (67%) delete mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/task/TaskState.java diff --git a/eventmesh-admin-server/conf/eventmesh.sql b/eventmesh-admin-server/conf/eventmesh.sql index 8e9ed4c4c8..c5f1bf1a9a 100644 --- a/eventmesh-admin-server/conf/eventmesh.sql +++ b/eventmesh-admin-server/conf/eventmesh.sql @@ -1,34 +1,154 @@ -- -------------------------------------------------------- --- 主机: 192.168.56.102 --- 服务器版本: 8.0.37 - MySQL Community Server - GPL --- 服务器操作系统: Linux --- HeidiSQL 版本: 12.7.0.6850 +-- 主机: 127.0.0.1 +-- 服务器版本: 8.0.36 - MySQL Community Server - GPL +-- 服务器操作系统: Win64 +-- HeidiSQL 版本: 11.3.0.6295 -- -------------------------------------------------------- /*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */; /*!40101 SET NAMES utf8 */; /*!50503 SET NAMES utf8mb4 */; -/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */; -/*!40103 SET TIME_ZONE='+00:00' */; /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */; /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */; /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; + +-- 导出 eventmesh 的数据库结构 +CREATE DATABASE IF NOT EXISTS `eventmesh` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci */ /*!80016 DEFAULT ENCRYPTION='N' */; +USE `eventmesh`; + +-- 导出 表 eventmesh.event_mesh_data_source 结构 +CREATE TABLE IF NOT EXISTS `event_mesh_data_source` ( + `id` int unsigned NOT NULL AUTO_INCREMENT, + `dataType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `description` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, + `configuration` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `region` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + `createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`) USING BTREE +) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; + +-- 数据导出被取消选择。 + +-- 导出 表 eventmesh.event_mesh_job_info 结构 +CREATE TABLE IF NOT EXISTS `event_mesh_job_info` ( + `id` int unsigned NOT NULL AUTO_INCREMENT, + `jobID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, + `taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `transportType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `sourceData` int NOT NULL DEFAULT '0', + `targetData` int NOT NULL DEFAULT '0', + `state` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `jobType` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `fromRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `createUid` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `updateUid` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`) USING BTREE, + UNIQUE KEY `jobID` (`jobID`) +) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; + -- 数据导出被取消选择。 +-- 导出 表 eventmesh.event_mesh_mysql_position 结构 +CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` ( + `id` int unsigned NOT NULL AUTO_INCREMENT, + `jobID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `serverUUID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, + `address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `position` bigint DEFAULT NULL, + `gtid` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, + `currentGtid` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, + `timestamp` bigint DEFAULT NULL, + `journalName` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, + `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + UNIQUE KEY `jobID` (`jobID`) +) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC; + -- 数据导出被取消选择。 +-- 导出 表 eventmesh.event_mesh_position_reporter_history 结构 +CREATE TABLE IF NOT EXISTS `event_mesh_position_reporter_history` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `job` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `record` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + KEY `job` (`job`), + KEY `address` (`address`) +) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='记录position上报者变更时,老记录'; + -- 数据导出被取消选择。 +-- 导出 表 eventmesh.event_mesh_runtime_heartbeat 结构 +CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` ( + `id` bigint unsigned NOT NULL AUTO_INCREMENT, + `adminAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `runtimeAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `jobID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `reportTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'runtime本地上报时间', + `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + UNIQUE KEY `runtimeAddr` (`runtimeAddr`), + KEY `jobID` (`jobID`) +) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; + -- 数据导出被取消选择。 +-- 导出 表 eventmesh.event_mesh_runtime_history 结构 +CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `job` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + KEY `address` (`address`) +) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='记录runtime上运行任务的变更'; + -- 数据导出被取消选择。 +-- 导出 表 eventmesh.event_mesh_task_info 结构 +CREATE TABLE IF NOT EXISTS `event_mesh_task_info` ( + `id` int unsigned NOT NULL AUTO_INCREMENT, + `taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, + `name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, + `desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, + `state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'TaskState', + `fromRegion` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '', + `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`) USING BTREE, + UNIQUE KEY `taskID` (`taskID`) +) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; + -- 数据导出被取消选择。 +-- 导出 表 eventmesh.event_mesh_verify 结构 +CREATE TABLE IF NOT EXISTS `event_mesh_verify` ( + `id` int NOT NULL, + `taskID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `recordID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `recordSig` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `connectorName` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `connectorStage` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `position` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL, + `createTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; + -- 数据导出被取消选择。 -/*!40103 SET TIME_ZONE=IFNULL(@OLD_TIME_ZONE, 'system') */; /*!40101 SET SQL_MODE=IFNULL(@OLD_SQL_MODE, '') */; /*!40014 SET FOREIGN_KEY_CHECKS=IFNULL(@OLD_FOREIGN_KEY_CHECKS, 1) */; /*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */; diff --git a/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml index a346c81bf6..d100e19033 100644 --- a/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml +++ b/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml @@ -25,18 +25,19 @@ - + - - + + + id,dataType,description, - configuration,createUid,updateUid, + configuration,region,createUid,updateUid, createTime,updateTime diff --git a/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml index d7e65128ab..02e8806680 100644 --- a/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml +++ b/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml @@ -1,4 +1,5 @@ + +*/ +--> + @@ -25,15 +28,22 @@ + + + + + + id,jobID,desc, - taskID,state,jobType, - createTime,updateTime + taskID,transportType,sourceData, + targetData,state,jobType, + fromRegion,createTime,updateTime diff --git a/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml index 11074aab9d..05b1dc52a0 100644 --- a/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml +++ b/eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml @@ -1,4 +1,5 @@ + +*/ +--> + @@ -25,20 +28,18 @@ - - - - - + + + id,taskID,name, - desc,transportType,sourceData, - targetData,state,createUid, - updateUid,createTime,updateTime + desc,state,fromRegion, + createUid,updateUid,createTime, + updateTime diff --git a/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml new file mode 100644 index 0000000000..b7b042145a --- /dev/null +++ b/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + + + + + + + + + + id,taskID,recordID, + recordSig,connectorName,connectorStage, + position,createTime + + diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java index 3f91115bdc..bd896d546c 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java @@ -17,10 +17,30 @@ package org.apache.eventmesh.admin.server.web; +import org.apache.eventmesh.admin.server.web.service.task.TaskBizService; +import org.apache.eventmesh.common.remote.request.CreateTaskRequest; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/eventmesh/admin") public class HttpServer { + @Autowired + private TaskBizService taskService; + + @RequestMapping("/createTask") + public ResponseEntity> createOrUpdateTask(@RequestBody CreateTaskRequest task) { + String uuid = taskService.createTask(task); + return ResponseEntity.ok(Response.success(uuid)); + } + + public boolean deleteTask(Long id) { + return false; + } + + } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/Response.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/Response.java index d58312146c..329a00baae 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/Response.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/Response.java @@ -17,35 +17,44 @@ package org.apache.eventmesh.admin.server.web; -public class Response { +import org.apache.eventmesh.common.remote.exception.ErrorCode; +public class Response { + private int code; private boolean success; private String desc; private T data; - public boolean isSuccess() { - return success; - } - - public void setSuccess(boolean success) { - this.success = success; - } - - public String getDesc() { - return desc; + public static Response success() { + Response response = new Response<>(); + response.success = true; + response.code = ErrorCode.SUCCESS; + return response; } - public void setDesc(String desc) { - this.desc = desc; + public static Response success(T data) { + Response response = new Response<>(); + response.success = true; + response.data = data; + return response; } - public T getData() { - return data; + public static Response fail(int code, String desc) { + Response response = new Response<>(); + response.success = false; + response.code = code; + response.desc = desc; + return response; } - public void setData(T data) { - this.data = data; + public static Response fail(int code, String desc, T data) { + Response response = new Response<>(); + response.success = false; + response.code = code; + response.desc = desc; + response.data = data; + return response; } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java index 3a4bc453bd..e79d6cd9c6 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java @@ -35,15 +35,17 @@ public class EventMeshDataSource implements Serializable { @TableId(type = IdType.AUTO) private Integer id; - private Integer dataType; + private String dataType; private String description; private String configuration; - private Integer createUid; + private String region; - private Integer updateUid; + private String createUid; + + private String updateUid; private Date createTime; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java index a0690a7830..f198f50806 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java @@ -1,32 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.eventmesh.admin.server.web.db.entity; +import java.io.Serializable; +import java.util.Date; + import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; -import lombok.Data; -import java.io.Serializable; -import java.util.Date; +import lombok.Data; /** - * @TableName event_mesh_job_info + * TableName event_mesh_job_info */ @TableName(value ="event_mesh_job_info") @Data public class EventMeshJobInfo implements Serializable { @TableId(type = IdType.AUTO) private Integer id; - @TableId(type = IdType.ASSIGN_UUID) + private String jobID; private String desc; private String taskID; + private String transportType; + + private Integer sourceData; + + private Integer targetData; + private String state; private String jobType; + private String fromRegion; + + private String createUid; + + private String updateUid; + private Date createTime; private Date updateTime; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshTaskInfo.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshTaskInfo.java index 9d25b44660..186da2dc6e 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshTaskInfo.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshTaskInfo.java @@ -1,39 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.eventmesh.admin.server.web.db.entity; +import java.io.Serializable; +import java.util.Date; + import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; -import lombok.Data; -import java.io.Serializable; -import java.util.Date; +import lombok.Data; /** - * @TableName event_mesh_task_info + * TableName event_mesh_task_info */ @TableName(value ="event_mesh_task_info") @Data public class EventMeshTaskInfo implements Serializable { @TableId(type = IdType.AUTO) private Integer id; - @TableId(type = IdType.ASSIGN_UUID) + private String taskID; private String name; private String desc; - private String transportType; - - private Integer sourceData; - - private Integer targetData; - private String state; - private Integer createUid; + private String fromRegion; + + private String createUid; - private Integer updateUid; + private String updateUid; private Date createTime; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java new file mode 100644 index 0000000000..c1cb7c0796 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.db.entity; + +import java.io.Serializable; +import java.util.Date; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; + +import lombok.Data; + +/** + * TableName event_mesh_verify + */ +@TableName(value ="event_mesh_verify") +@Data +public class EventMeshVerify implements Serializable { + @TableId(type = IdType.AUTO) + private Integer id; + + private String taskID; + + private String recordID; + + private String recordSig; + + private String connectorName; + + private String connectorStage; + + private String position; + + private Date createTime; + + private static final long serialVersionUID = 1L; +} \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java index eb57c0af2c..39f8a4aed6 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java @@ -25,8 +25,7 @@ /** * for table 'event_mesh_job_info' db operation - * 2024-05-09 15:51:45 - * entity.db.web.server.admin.eventmesh.apache.org.EventMeshJobInfo + * entity org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo */ @Mapper public interface EventMeshJobInfoMapper extends BaseMapper { diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshTaskInfoMapper.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshTaskInfoMapper.java index e882fd3f34..b9d226fc34 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshTaskInfoMapper.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshTaskInfoMapper.java @@ -18,12 +18,16 @@ package org.apache.eventmesh.admin.server.web.db.mapper; import org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo; + +import org.apache.ibatis.annotations.Mapper; + import com.baomidou.mybatisplus.core.mapper.BaseMapper; /** * event_mesh_task_info * Entity org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo */ +@Mapper public interface EventMeshTaskInfoMapper extends BaseMapper { } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshVerifyMapper.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshVerifyMapper.java new file mode 100644 index 0000000000..4919fc84f9 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshVerifyMapper.java @@ -0,0 +1,37 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.eventmesh.admin.server.web.db.mapper; + +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify; + +import org.apache.ibatis.annotations.Mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +/** +* event_mesh_verify +* Entity org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify +*/ +@Mapper +public interface EventMeshVerifyMapper extends BaseMapper { + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java index c5ad399854..16dc36c119 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java @@ -1,29 +1,28 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ package org.apache.eventmesh.admin.server.web.db.service; import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; - import com.baomidou.mybatisplus.extension.service.IService; /** -* for table `event_mesh_job_info' db operation -* 2024-05-09 15:51:45 +* event_mesh_job_info */ public interface EventMeshJobInfoService extends IService { + } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshVerifyService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshVerifyService.java new file mode 100644 index 0000000000..6aa896fd83 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshVerifyService.java @@ -0,0 +1,29 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.eventmesh.admin.server.web.db.service; + +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify; + +import com.baomidou.mybatisplus.extension.service.IService; + +/** +* event_mesh_verify +*/ +public interface EventMeshVerifyService extends IService { + +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java index dd7312ceae..0e0de26ea3 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java @@ -1,40 +1,34 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ package org.apache.eventmesh.admin.server.web.db.service.impl; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; -import org.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper; import org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; - +import org.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper; import org.springframework.stereotype.Service; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; - -import lombok.extern.slf4j.Slf4j; - /** - * for table 'event_mesh_job_info' db operation - * 2024-05-09 15:51:45 - */ +* event_mesh_job_info +*/ @Service -@Slf4j public class EventMeshJobInfoServiceImpl extends ServiceImpl - implements EventMeshJobInfoService { + implements EventMeshJobInfoService{ } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java index 4c15139566..8f159fa45b 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java @@ -17,13 +17,15 @@ package org.apache.eventmesh.admin.server.web.handler.impl; -import org.apache.eventmesh.admin.server.AdminServerRuntimeException; import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; -import org.apache.eventmesh.admin.server.web.service.job.EventMeshJobInfoBizService; +import org.apache.eventmesh.admin.server.web.pojo.JobDetail; +import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.job.JobConnectorConfig; import org.apache.eventmesh.common.remote.request.FetchJobRequest; import org.apache.eventmesh.common.remote.response.FetchJobResponse; +import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.commons.lang3.StringUtils; @@ -37,34 +39,29 @@ public class FetchJobRequestHandler extends BaseRequestHandler { @Autowired - EventMeshJobInfoBizService jobInfoBizService; + JobInfoBizService jobInfoBizService; @Override public FetchJobResponse handler(FetchJobRequest request, Metadata metadata) { if (StringUtils.isBlank(request.getJobID())) { - throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, "job id is empty"); - } - int jobID; - try { - jobID = Integer.parseInt(request.getJobID()); - } catch (NumberFormatException e) { - throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, String.format("illegal job id %s", - request.getJobID())); + return FetchJobResponse.failResponse(ErrorCode.BAD_REQUEST, "job id is empty"); } FetchJobResponse response = FetchJobResponse.successResponse(); - EventMeshJobDetail detail = jobInfoBizService.getJobDetail(jobID); + JobDetail detail = jobInfoBizService.getJobDetail(request.getJobID()); if (detail == null) { return response; } - response.setId(detail.getId()); - response.setName(detail.getName()); - response.setSourceConnectorConfig(detail.getSourceConnectorConfig()); - response.setSourceConnectorDesc(detail.getSourceConnectorDesc()); + response.setId(detail.getJobID()); + JobConnectorConfig config = new JobConnectorConfig(); + config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource())); + config.setSourceConnectorDesc(detail.getSourceConnectorDesc()); + config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource())); + config.setSourceConnectorDesc(detail.getSinkConnectorDesc()); + response.setConnectorConfig(config); response.setTransportType(detail.getTransportType()); - response.setSinkConnectorConfig(detail.getSinkConnectorConfig()); - response.setSourceConnectorDesc(detail.getSinkConnectorDesc()); response.setState(detail.getState()); - response.setPosition(detail.getPosition()); + response.setPosition(detail.getPositions()); + response.setType(detail.getJobType()); return response; } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchPositionHandler.java index 2e6fa31f05..85ef0e6113 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchPositionHandler.java @@ -17,10 +17,9 @@ package org.apache.eventmesh.admin.server.web.handler.impl; -import org.apache.eventmesh.admin.server.AdminServerRuntimeException; import org.apache.eventmesh.admin.server.web.db.DBThreadPool; import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; -import org.apache.eventmesh.admin.server.web.service.position.EventMeshPositionBizService; +import org.apache.eventmesh.admin.server.web.service.position.PositionBizService; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.remote.exception.ErrorCode; import org.apache.eventmesh.common.remote.request.FetchPositionRequest; @@ -41,15 +40,15 @@ public class FetchPositionHandler extends BaseRequestHandler { +public class ReportHeartBeatHandler extends BaseRequestHandler { @Autowired - EventMeshRuntimeHeartbeatBizService heartbeatBizService; + RuntimeHeartbeatBizService heartbeatBizService; @Autowired DBThreadPool executor; @Override - protected EmptyAckResponse handler(ReportHeartBeatRequest request, Metadata metadata) { + protected SimpleResponse handler(ReportHeartBeatRequest request, Metadata metadata) { + if (StringUtils.isBlank(request.getJobID()) || StringUtils.isBlank(request.getAddress())) { + log.info("request [{}] id or reporter address is empty", request); + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request id or reporter address is empty"); + } executor.getExecutors().execute(() -> { EventMeshRuntimeHeartbeat heartbeat = new EventMeshRuntimeHeartbeat(); - int job; - try { - job = Integer.parseInt(request.getJobID()); - } catch (NumberFormatException e) { - log.warn("runtime {} report heartbeat fail, illegal job id {}", request.getAddress(), request.getJobID()); - return; - } - heartbeat.setJobID(job); + heartbeat.setJobID(request.getJobID()); heartbeat.setReportTime(request.getReportedTimeStamp()); heartbeat.setAdminAddr(IPUtils.getLocalAddress()); heartbeat.setRuntimeAddr(request.getAddress()); @@ -65,6 +65,6 @@ protected EmptyAckResponse handler(ReportHeartBeatRequest request, Metadata meta } }); - return new EmptyAckResponse(); + return SimpleResponse.success(); } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java index 0616608541..5e2a968262 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java @@ -17,16 +17,15 @@ package org.apache.eventmesh.admin.server.web.handler.impl; -import org.apache.eventmesh.admin.server.AdminServerRuntimeException; import org.apache.eventmesh.admin.server.web.db.DBThreadPool; -import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; -import org.apache.eventmesh.admin.server.web.service.job.EventMeshJobInfoBizService; -import org.apache.eventmesh.admin.server.web.service.position.EventMeshPositionBizService; +import org.apache.eventmesh.admin.server.web.pojo.JobDetail; +import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService; +import org.apache.eventmesh.admin.server.web.service.position.PositionBizService; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.remote.exception.ErrorCode; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; -import org.apache.eventmesh.common.remote.response.EmptyAckResponse; +import org.apache.eventmesh.common.remote.response.SimpleResponse; import org.apache.commons.lang3.StringUtils; @@ -37,35 +36,33 @@ @Component @Slf4j -public class ReportPositionHandler extends BaseRequestHandler { +public class ReportPositionHandler extends BaseRequestHandler { @Autowired - private EventMeshJobInfoBizService jobInfoBizService; + private JobInfoBizService jobInfoBizService; @Autowired private DBThreadPool executor; @Autowired - private EventMeshPositionBizService positionBizService; - + private PositionBizService positionBizService; @Override - protected EmptyAckResponse handler(ReportPositionRequest request, Metadata metadata) { + protected SimpleResponse handler(ReportPositionRequest request, Metadata metadata) { + if (StringUtils.isBlank(request.getJobID())) { + log.info("request [{}] illegal job id", request); + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, it's empty"); + } if (request.getDataSourceType() == null) { - throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, "illegal data type, it's empty"); + log.info("request [{}] illegal data type", request); + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal data type, it's empty"); } if (StringUtils.isBlank(request.getJobID())) { - throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, "illegal job id, it's empty"); + log.info("request [{}] illegal job id", request); + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, it's empty"); } if (request.getRecordPositionList() == null || request.getRecordPositionList().isEmpty()) { - throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, "illegal record position list, it's empty"); - } - int jobID; - - try { - jobID = Integer.parseInt(request.getJobID()); - } catch (NumberFormatException e) { - throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, String.format("illegal job id [%s] format", - request.getJobID())); + log.info("request [{}] illegal record position", request); + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal record position list, it's empty"); } positionBizService.isValidatePositionRequest(request.getDataSourceType()); @@ -88,10 +85,10 @@ protected EmptyAckResponse handler(ReportPositionRequest request, Metadata metad log.warn("handle position request fail, request [{}]", request, e); } finally { try { - EventMeshJobInfo detail = jobInfoBizService.getJobDetail(jobID); - if (detail != null && !detail.getState().equals(request.getState()) && !jobInfoBizService.updateJobState(jobID, + JobDetail detail = jobInfoBizService.getJobDetail(request.getJobID()); + if (detail != null && !detail.getState().equals(request.getState()) && !jobInfoBizService.updateJobState(request.getJobID(), request.getState())) { - log.warn("update job [{}] old state [{}] to [{}] fail", jobID, detail.getState(), request.getState()); + log.warn("update job [{}] old state [{}] to [{}] fail", request.getJobID(), detail.getState(), request.getState()); } } catch (Exception e) { log.warn("update job id [{}] type [{}] state [{}] fail", request.getJobID(), @@ -99,6 +96,6 @@ protected EmptyAckResponse handler(ReportPositionRequest request, Metadata metad } } }); - return new EmptyAckResponse(); + return SimpleResponse.success(); } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java new file mode 100644 index 0000000000..4f3dcf6230 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.handler.impl; + +import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; +import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.request.ReportVerifyRequest; +import org.apache.eventmesh.common.remote.response.SimpleResponse; + +import org.apache.commons.lang3.StringUtils; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +@Component +@Slf4j +public class ReportVerifyHandler extends BaseRequestHandler { + @Autowired + private VerifyBizService verifyService; + + @Override + protected SimpleResponse handler(ReportVerifyRequest request, Metadata metadata) { + if (StringUtils.isAnyBlank(request.getTaskID(), request.getRecordSig(), request.getRecordID(), request.getConnectorStage())) { + log.info("report verify request [{}] illegal", request); + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request task id, sign, record id or stage is none"); + } + return verifyService.reportVerifyRecord(request) ? SimpleResponse.success() : SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save verify " + + "request fail"); + } +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/JobDetail.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/JobDetail.java index 33a986fb75..c47b284483 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/JobDetail.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/JobDetail.java @@ -17,35 +17,48 @@ package org.apache.eventmesh.admin.server.web.pojo; -import lombok.Data; +import org.apache.eventmesh.common.remote.TaskState; +import org.apache.eventmesh.common.remote.TransportType; +import org.apache.eventmesh.common.remote.datasource.DataSource; +import org.apache.eventmesh.common.remote.job.JobType; import org.apache.eventmesh.common.remote.offset.RecordPosition; -import org.apache.eventmesh.common.remote.task.TransportType; import java.util.Date; import java.util.List; +import lombok.Data; + @Data public class JobDetail { private Integer id; + private String jobID; + private String desc; + private String taskID; - private String state; + private TaskState state; - private String jobType; + private JobType jobType; private Date createTime; private Date updateTime; - private String sourceConfig; + private String createUid; + + private String updateUid; + + private String region; + + private DataSource sourceDataSource; - private String sourceDesc; + private String sourceConnectorDesc; - private String targetConfig; + private DataSource sinkDataSource; - private String targetDesc; + private String sinkConnectorDesc; private TransportType transportType; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/AdminGrpcServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/AdminGrpcServer.java index 9876f5516a..bc822ad6c3 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/AdminGrpcServer.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/AdminGrpcServer.java @@ -26,8 +26,7 @@ import org.apache.eventmesh.common.remote.payload.PayloadUtil; import org.apache.eventmesh.common.remote.request.BaseRemoteRequest; import org.apache.eventmesh.common.remote.response.BaseRemoteResponse; -import org.apache.eventmesh.common.remote.response.EmptyAckResponse; -import org.apache.eventmesh.common.remote.response.FailResponse; +import org.apache.eventmesh.common.remote.response.SimpleResponse; import org.apache.commons.lang3.StringUtils; @@ -48,24 +47,26 @@ public class AdminGrpcServer extends AdminServiceGrpc.AdminServiceImplBase { private Payload process(Payload value) { if (value == null || StringUtils.isBlank(value.getMetadata().getType())) { - return PayloadUtil.from(FailResponse.build(ErrorCode.BAD_REQUEST, "bad request: type not exists")); + return PayloadUtil.from(SimpleResponse.fail(ErrorCode.BAD_REQUEST, "bad request: type not exists")); } try { BaseRequestHandler handler = handlerFactory.getHandler(value.getMetadata().getType()); if (handler == null) { - return PayloadUtil.from(FailResponse.build(BaseRemoteResponse.UNKNOWN, "not match any request handler")); + return PayloadUtil.from(SimpleResponse.fail(ErrorCode.BAD_REQUEST, "not match any request handler")); } BaseRemoteResponse response = handler.handlerRequest((BaseRemoteRequest) PayloadUtil.parse(value), value.getMetadata()); - if (response == null || response instanceof EmptyAckResponse) { - return null; + if (response == null) { + log.warn("received request type [{}] handler [{}], then replay empty response", value.getMetadata().getType(), + handler.getClass().getName()); + response = SimpleResponse.success(); } return PayloadUtil.from(response); } catch (Exception e) { log.warn("process payload {} fail", value.getMetadata().getType(), e); if (e instanceof AdminServerRuntimeException) { - return PayloadUtil.from(FailResponse.build(((AdminServerRuntimeException) e).getCode(), e.getMessage())); + return PayloadUtil.from(SimpleResponse.fail(((AdminServerRuntimeException) e).getCode(), e.getMessage())); } - return PayloadUtil.from(FailResponse.build(ErrorCode.INTERNAL_ERR, "admin server internal err")); + return PayloadUtil.from(SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "admin server internal err")); } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/AdminServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/AdminServer.java index 94e27ae4d2..fd7582800d 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/AdminServer.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/AdminServer.java @@ -17,16 +17,14 @@ package org.apache.eventmesh.admin.server.web.service; -import org.apache.eventmesh.admin.server.web.Admin; import org.apache.eventmesh.admin.server.AdminServerProperties; import org.apache.eventmesh.admin.server.AdminServerRuntimeException; +import org.apache.eventmesh.common.ComponentLifeCycle; import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.common.config.CommonConfiguration; import org.apache.eventmesh.common.config.ConfigService; import org.apache.eventmesh.common.remote.exception.ErrorCode; -import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; import org.apache.eventmesh.common.utils.IPUtils; -import org.apache.eventmesh.common.utils.PagedList; import org.apache.eventmesh.registry.RegisterServerInfo; import org.apache.eventmesh.registry.RegistryFactory; import org.apache.eventmesh.registry.RegistryService; @@ -43,8 +41,7 @@ @Service @Slf4j -public class AdminServer implements Admin, ApplicationListener { - +public class AdminServer implements ComponentLifeCycle, ApplicationListener { private final RegistryService registryService; private final RegisterServerInfo adminServeInfo; @@ -69,37 +66,6 @@ public AdminServer(AdminServerProperties properties) { registryService = RegistryFactory.getInstance(configuration.getEventMeshRegistryPluginType()); } - - @Override - public Task createOrUpdateTask(Task task) { - if (task.getId() == null) { - - } else { - - } - return null; - } - - @Override - public boolean deleteTask(Long id) { - return false; - } - - @Override - public Task getTask(Long id) { - return null; - } - - @Override - public PagedList getTaskPaged(Task task) { - return null; - } - - @Override - public void reportHeartbeat(ReportHeartBeatRequest heartBeat) { - - } - @Override @PostConstruct public void start() { diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java new file mode 100644 index 0000000000..433847a4cd --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/datasource/DataSourceBizService.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.service.datasource; + +import org.apache.eventmesh.admin.server.AdminServerRuntimeException; +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.request.CreateOrUpdateDataSourceReq; +import org.apache.eventmesh.common.utils.JsonUtils; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class DataSourceBizService { + @Autowired + private EventMeshDataSourceService dataSourceService; + + public EventMeshDataSource createDataSource(CreateOrUpdateDataSourceReq dataSource) { + EventMeshDataSource entity = new EventMeshDataSource(); + entity.setConfiguration(JsonUtils.toJSONString(dataSource.getConfig())); + entity.setDataType(dataSource.getType().name()); + entity.setCreateUid(dataSource.getOperator()); + entity.setUpdateUid(dataSource.getOperator()); + entity.setRegion(dataSource.getRegion()); + entity.setDescription(dataSource.getDesc()); + if (dataSourceService.save(entity)) { + return entity; + } + throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "save data source fail"); + } +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/heatbeat/EventMeshRuntimeHeartbeatBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/heatbeat/RuntimeHeartbeatBizService.java similarity index 98% rename from eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/heatbeat/EventMeshRuntimeHeartbeatBizService.java rename to eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/heatbeat/RuntimeHeartbeatBizService.java index 4fa80b270a..95dff6e5b3 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/heatbeat/EventMeshRuntimeHeartbeatBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/heatbeat/RuntimeHeartbeatBizService.java @@ -34,7 +34,7 @@ */ @Service @Slf4j -public class EventMeshRuntimeHeartbeatBizService { +public class RuntimeHeartbeatBizService { @Autowired EventMeshRuntimeHistoryService historyService; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/EventMeshJobInfoBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java similarity index 55% rename from eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/EventMeshJobInfoBizService.java rename to eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java index 9b7df8dc3f..9225690ab0 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/EventMeshJobInfoBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java @@ -23,28 +23,28 @@ import org.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService; import org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoExtService; import org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; -import org.apache.eventmesh.admin.server.web.db.service.EventMeshTaskInfoService; import org.apache.eventmesh.admin.server.web.pojo.JobDetail; -import org.apache.eventmesh.admin.server.web.service.position.EventMeshPositionBizService; -import org.apache.eventmesh.common.remote.job.JobState; -import org.apache.eventmesh.common.remote.job.JobType; -import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.admin.server.web.service.datasource.DataSourceBizService; +import org.apache.eventmesh.admin.server.web.service.position.PositionBizService; +import org.apache.eventmesh.common.remote.TaskState; +import org.apache.eventmesh.common.remote.TransportType; +import org.apache.eventmesh.common.remote.datasource.DataSource; import org.apache.eventmesh.common.remote.datasource.DataSourceType; -import org.apache.eventmesh.common.remote.task.TransportType; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.request.CreateOrUpdateDataSourceReq; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.commons.lang3.StringUtils; import java.util.LinkedList; import java.util.List; -import java.util.Map; +import java.util.UUID; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import com.baomidou.mybatisplus.core.toolkit.Wrappers; -import com.fasterxml.jackson.core.type.TypeReference; import lombok.extern.slf4j.Slf4j; @@ -54,47 +54,75 @@ */ @Service @Slf4j -public class EventMeshJobInfoBizService { +public class JobInfoBizService { @Autowired - EventMeshJobInfoService jobInfoService; + private EventMeshJobInfoService jobInfoService; @Autowired - EventMeshJobInfoExtService jobInfoExtService; + private EventMeshJobInfoExtService jobInfoExtService; @Autowired - EventMeshTaskInfoService taskInfoService; + private DataSourceBizService dataSourceBizService; @Autowired - EventMeshDataSourceService dataSourceService; + private EventMeshDataSourceService dataSourceService; @Autowired - EventMeshPositionBizService positionBizService; + private PositionBizService positionBizService; - public boolean updateJobState(String jobID, JobState state) { + public boolean updateJobState(String jobID, TaskState state) { if (jobID == null || state == null) { return false; } EventMeshJobInfo jobInfo = new EventMeshJobInfo(); jobInfo.setState(state.name()); - jobInfoService.update(jobInfo, Wrappers.update().eq("jobID", jobID).ne("state", JobState.DELETE.name())); - return true; + return jobInfoService.update(jobInfo, Wrappers.update().eq("jobID", jobID).ne("state", TaskState.DELETE.name())); } @Transactional - public List createJobs(String taskID, List type) { + public List createJobs(List jobs) { + if (jobs == null || jobs.isEmpty() || jobs.stream().anyMatch(job -> StringUtils.isBlank(job.getTaskID()))) { + log.warn("when create jobs, task id is empty or jobs config is empty "); + return null; + } List entityList = new LinkedList<>(); - for (JobType jobType : type) { - EventMeshJobInfo job = new EventMeshJobInfo(); - job.setState(JobState.INIT.name()); - job.setTaskID(taskID); - job.setJobType(jobType.name()); - entityList.add(job); + for (JobDetail job : jobs) { + CreateOrUpdateDataSourceReq source = new CreateOrUpdateDataSourceReq(); + source.setType(job.getTransportType().getSrc()); + source.setOperator(job.getCreateUid()); + source.setRegion(job.getRegion()); + source.setDesc(job.getSourceConnectorDesc()); + source.setConfig(job.getSourceDataSource()); + EventMeshDataSource createdSource = dataSourceBizService.createDataSource(source); + + CreateOrUpdateDataSourceReq sink = new CreateOrUpdateDataSourceReq(); + sink.setType(job.getTransportType().getDst()); + sink.setOperator(job.getCreateUid()); + sink.setRegion(job.getRegion()); + sink.setDesc(job.getSinkConnectorDesc()); + sink.setConfig(job.getSinkDataSource()); + + EventMeshDataSource createdSink = dataSourceBizService.createDataSource(source); + String jobID = UUID.randomUUID().toString(); + EventMeshJobInfo entity = new EventMeshJobInfo(); + entity.setState(TaskState.INIT.name()); + entity.setTaskID(job.getTaskID()); + entity.setJobType(job.getJobType().name()); + entity.setDesc(job.getDesc()); + entity.setSourceData(createdSource.getId()); + entity.setTargetData(createdSink.getId()); + entity.setJobID(jobID); + entity.setTransportType(job.getTransportType().name()); + entity.setCreateUid(job.getCreateUid()); + entity.setUpdateUid(job.getUpdateUid()); + entity.setFromRegion(job.getRegion()); + entityList.add(entity); } int changed = jobInfoExtService.batchSave(entityList); - if (changed != type.size()) { - throw new AdminServerRuntimeException(ErrorCode.INTERNAL_ERR, String.format("create [%d] jobs of task [%s] not match expect [%d]", - changed, taskID, type.size())); + if (changed != jobs.size()) { + throw new AdminServerRuntimeException(ErrorCode.INTERNAL_ERR, String.format("create [%d] jobs of not match expect [%d]", + changed, jobs.size())); } return entityList; } @@ -115,9 +143,7 @@ public JobDetail getJobDetail(String jobID) { if (source != null) { if (!StringUtils.isBlank(source.getConfiguration())) { try { - detail.setSourceConnectorConfig(JsonUtils.parseTypeReferenceObject(source.getConfiguration(), - new TypeReference>() { - })); + detail.setSourceDataSource(JsonUtils.parseObject(source.getConfiguration(), DataSource.class)); } catch (Exception e) { log.warn("parse source config id [{}] fail", job.getSourceData(), e); throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal source data source config"); @@ -125,7 +151,7 @@ public JobDetail getJobDetail(String jobID) { } detail.setSourceConnectorDesc(source.getDescription()); if (source.getDataType() != null) { - detail.setPosition(positionBizService.getPositionByJobID(job.getJobID(), + detail.setPositions(positionBizService.getPositionByJobID(job.getJobID(), DataSourceType.getDataSourceType(source.getDataType()))); } @@ -133,9 +159,7 @@ public JobDetail getJobDetail(String jobID) { if (target != null) { if (!StringUtils.isBlank(target.getConfiguration())) { try { - detail.setSinkConnectorConfig(JsonUtils.parseTypeReferenceObject(target.getConfiguration(), - new TypeReference>() { - })); + detail.setSinkDataSource(JsonUtils.parseObject(target.getConfiguration(), DataSource.class)); } catch (Exception e) { log.warn("parse sink config id [{}] fail", job.getSourceData(), e); throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal target data sink config"); @@ -144,12 +168,12 @@ public JobDetail getJobDetail(String jobID) { detail.setSinkConnectorDesc(target.getDescription()); } - JobState state = JobState.fromIndex(job.getState()); + TaskState state = TaskState.fromIndex(job.getState()); if (state == null) { throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal job state in db"); } detail.setState(state); - detail.setTransportType(TransportType.getJobTransportType(job.getTransportType())); + detail.setTransportType(TransportType.getTransportType(job.getTransportType())); return detail; } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionBizService.java similarity index 94% rename from eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java rename to eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionBizService.java index 389f06df5a..cbd44ba8c2 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionBizService.java @@ -34,7 +34,7 @@ @Service @Slf4j -public class EventMeshPositionBizService { +public class PositionBizService { @Autowired PositionHandlerFactory factory; @@ -70,14 +70,14 @@ public boolean reportPosition(ReportPositionRequest request, Metadata metadata) return handler.handler(request, metadata); } - public List getPositionByJobID(Integer jobID, DataSourceType type) { + public List getPositionByJobID(String jobID, DataSourceType type) { if (jobID == null || type == null) { return null; } isValidatePositionRequest(type); PositionHandler handler = factory.getHandler(type); FetchPositionRequest request = new FetchPositionRequest(); - request.setJobID(String.valueOf(jobID)); + request.setJobID(jobID); return handler.handler(request, null); } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java index 950263f0f9..352ba57e96 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java @@ -142,7 +142,7 @@ public boolean handler(ReportPositionRequest request, Metadata metadata) { return false; } EventMeshMysqlPosition position = new EventMeshMysqlPosition(); - position.setJobID(Integer.parseInt(request.getJobID())); + position.setJobID(request.getJobID()); position.setAddress(request.getAddress()); CanalRecordOffset offset = (CanalRecordOffset) recordPosition.getRecordOffset(); if (offset != null) { diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java index cae0ad3f0d..b4fdc57af0 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java @@ -17,23 +17,61 @@ package org.apache.eventmesh.admin.server.web.service.task; +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo; import org.apache.eventmesh.admin.server.web.db.service.EventMeshTaskInfoService; +import org.apache.eventmesh.admin.server.web.pojo.JobDetail; +import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService; +import org.apache.eventmesh.common.remote.TaskState; +import org.apache.eventmesh.common.remote.request.CreateTaskRequest; + +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.util.UUID; - @Service public class TaskBizService { @Autowired private EventMeshTaskInfoService taskInfoService; + @Autowired + private JobInfoBizService jobInfoService; @Transactional - public void createTask() { - String uuid = UUID.randomUUID().toString(); + public String createTask(CreateTaskRequest req) { + String taskID = UUID.randomUUID().toString(); + List jobs = req.getJobs().stream().map(x -> { + JobDetail job = parse(x); + job.setTaskID(taskID); + job.setRegion(req.getRegion()); + job.setCreateUid(req.getUid()); + job.setUpdateUid(req.getUid()); + return job; + }).collect(Collectors.toList()); + jobInfoService.createJobs(jobs); + EventMeshTaskInfo taskInfo = new EventMeshTaskInfo(); + taskInfo.setTaskID(taskID); + taskInfo.setName(req.getName()); + taskInfo.setDesc(req.getDesc()); + taskInfo.setState(TaskState.INIT.name()); + taskInfo.setCreateUid(req.getUid()); + taskInfo.setFromRegion(req.getRegion()); + taskInfoService.save(taskInfo); + return taskID; + } + private JobDetail parse(CreateTaskRequest.JobDetail src) { + JobDetail dst = new JobDetail(); + dst.setDesc(src.getDesc()); + dst.setTransportType(src.getTransportType()); + dst.setSourceConnectorDesc(src.getSourceConnectorDesc()); + dst.setSourceDataSource(src.getSourceDataSource()); + dst.setSinkConnectorDesc(src.getSinkConnectorDesc()); + dst.setSinkDataSource(src.getSinkDataSource()); + dst.setJobType(src.getJobType()); + return dst; } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java new file mode 100644 index 0000000000..74f208b199 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.service.verify; + +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshVerifyService; +import org.apache.eventmesh.common.remote.request.ReportVerifyRequest; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class VerifyBizService { + @Autowired + private EventMeshVerifyService verifyService; + + public boolean reportVerifyRecord(ReportVerifyRequest request) { + EventMeshVerify verify = new EventMeshVerify(); + verify.setRecordID(request.getRecordID()); + verify.setRecordSig(request.getRecordSig()); + verify.setPosition(request.getPosition()); + verify.setTaskID(request.getTaskID()); + verify.setConnectorName(request.getConnectorName()); + verify.setConnectorStage(request.getConnectorStage()); + return verifyService.save(verify); + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobState.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TaskState.java similarity index 75% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobState.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TaskState.java index 1376642996..606339c443 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobState.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TaskState.java @@ -15,22 +15,25 @@ * limitations under the License. */ -package org.apache.eventmesh.common.remote.job; +package org.apache.eventmesh.common.remote; import java.util.HashMap; import java.util.Map; -public enum JobState { +import lombok.ToString; + +@ToString +public enum TaskState { INIT, STARTED, RUNNING, PAUSE, COMPLETE, DELETE, FAIL; - private static final JobState[] STATES_NUM_INDEX = JobState.values(); - private static final Map STATES_NAME_INDEX = new HashMap<>(); + private static final TaskState[] STATES_NUM_INDEX = TaskState.values(); + private static final Map STATES_NAME_INDEX = new HashMap<>(); static { - for (JobState jobState : STATES_NUM_INDEX) { + for (TaskState jobState : STATES_NUM_INDEX) { STATES_NAME_INDEX.put(jobState.name(), jobState); } } - public static JobState fromIndex(Integer index) { + public static TaskState fromIndex(Integer index) { if (index == null || index < 0 || index >= STATES_NUM_INDEX.length) { return null; } @@ -38,7 +41,7 @@ public static JobState fromIndex(Integer index) { return STATES_NUM_INDEX[index]; } - public static JobState fromIndex(String index) { + public static TaskState fromIndex(String index) { if (index == null || index.isEmpty()) { return null; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/task/TransportType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java similarity index 84% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/task/TransportType.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java index 01d49beb7c..95a88a23fa 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/task/TransportType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.common.remote.task; +package org.apache.eventmesh.common.remote; import org.apache.eventmesh.common.remote.datasource.DataSourceType; @@ -28,7 +28,10 @@ public enum TransportType { MYSQL_MYSQL(DataSourceType.MYSQL, DataSourceType.MYSQL), REDIS_REDIS(DataSourceType.REDIS, DataSourceType.REDIS), - ROCKETMQ_ROCKETMQ(DataSourceType.ROCKETMQ, DataSourceType.ROCKETMQ); + ROCKETMQ_ROCKETMQ(DataSourceType.ROCKETMQ, DataSourceType.ROCKETMQ), + MYSQL_HTTP(DataSourceType.MYSQL, DataSourceType.HTTP), + HTTP_MYSQL(DataSourceType.HTTP, DataSourceType.MYSQL), + REDIS_MQ(DataSourceType.REDIS, DataSourceType.ROCKETMQ); private static final Map INDEX_TYPES = new HashMap<>(); private static final TransportType[] TYPES = TransportType.values(); private static final String SEPARATOR = "@"; @@ -49,14 +52,14 @@ public enum TransportType { } - public static TransportType getJobTransportType(String index) { + public static TransportType getTransportType(String index) { if (index == null || index.isEmpty()) { return null; } return INDEX_TYPES.get(index); } - public static TransportType getJobTransportType(Integer index) { + public static TransportType getTransportType(Integer index) { if (index == null || index < 0 || index >= TYPES.length) { return null; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSource.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSource.java index f3a83611d4..7af3812f24 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSource.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSource.java @@ -17,28 +17,27 @@ package org.apache.eventmesh.common.remote.datasource; -import lombok.Data; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; -@Data -public class DataSource { - private Integer id; - - private Integer dataType; - - private String description; +import lombok.Getter; - private String sourceUser; - private String sourcePasswd; - private String targetUser; - private String targetPasswd; - private int sourceType; - private int targetType; - - private Integer createUid; - - private Integer updateUid; +@Getter +public class DataSource { + private final DataSourceType type; + private String desc; + @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS) + @JsonSubTypes({ + @JsonSubTypes.Type(value = MySqlIncDataSourceSourceConf.class, name = "MySqlIncDataSourceSourceConf") + }) + private final DataSourceConf conf; + private final Class confClazz; + + public DataSource(DataSourceType type, DataSourceConf conf) { + this.type = type; + this.conf = conf; + this.confClazz = conf.getConfClass(); + } - private String createTime; - private String updateTime; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceClassify.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceClassify.java index 24a417125b..8cb01c9204 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceClassify.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceClassify.java @@ -21,5 +21,6 @@ public enum DataSourceClassify { // relationship db RDB, MQ, - CACHE; + CACHE, + TUNNEL; } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/Admin.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceConf.java similarity index 56% rename from eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/Admin.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceConf.java index 38fbbc112d..9701a9fa11 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/Admin.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceConf.java @@ -15,31 +15,9 @@ * limitations under the License. */ -package org.apache.eventmesh.admin.server.web; +package org.apache.eventmesh.common.remote.datasource; -import org.apache.eventmesh.common.ComponentLifeCycle; -import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest; -import org.apache.eventmesh.common.utils.PagedList; -/** - * Admin - */ -public interface Admin extends ComponentLifeCycle { - - /** - * support for web or ops - **/ - Task createOrUpdateTask(Task task); - - boolean deleteTask(Long id); - - Task getTask(Long id); - - // paged list - PagedList getTaskPaged(Task task); - - /** - * support for task - */ - void reportHeartbeat(ReportHeartBeatRequest heartBeat); +public abstract class DataSourceConf { + public abstract Class getConfClass(); } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceDriverType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceDriverType.java index 5854fd8a35..4429bee5a9 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceDriverType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceDriverType.java @@ -20,5 +20,6 @@ public enum DataSourceDriverType { MYSQL, REDIS, - ROCKETMQ; + ROCKETMQ, + HTTP; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java index f3d37090fe..985f311b92 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/DataSourceType.java @@ -17,10 +17,27 @@ package org.apache.eventmesh.common.remote.datasource; +import java.util.HashMap; +import java.util.Map; + +import lombok.Getter; +import lombok.ToString; + +@Getter +@ToString public enum DataSourceType { MYSQL("MySQL", DataSourceDriverType.MYSQL, DataSourceClassify.RDB), REDIS("Redis", DataSourceDriverType.REDIS, DataSourceClassify.CACHE), - ROCKETMQ("RocketMQ", DataSourceDriverType.ROCKETMQ, DataSourceClassify.MQ); + ROCKETMQ("RocketMQ", DataSourceDriverType.ROCKETMQ, DataSourceClassify.MQ), + HTTP("HTTP", DataSourceDriverType.HTTP, DataSourceClassify.TUNNEL); + private static final Map INDEX_TYPES = new HashMap<>(); + private static final DataSourceType[] TYPES = DataSourceType.values(); + static { + for (DataSourceType type : TYPES) { + INDEX_TYPES.put(type.name(), type); + } + } + private final String name; private final DataSourceDriverType driverType; private final DataSourceClassify classify; @@ -31,20 +48,13 @@ public enum DataSourceType { this.classify = classify; } - public String getName() { - return name; - } - - public DataSourceDriverType getDriverType() { - return driverType; - } - - public DataSourceClassify getClassify() { - return classify; + public static DataSourceType getDataSourceType(String index) { + if (index == null || index.isEmpty()) { + return null; + } + return INDEX_TYPES.get(index); } - private static final DataSourceType[] TYPES = DataSourceType.values(); - public static DataSourceType getDataSourceType(Integer index) { if (index == null || index < 0 || index >= TYPES.length) { return null; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/MySqlIncDataSourceSourceConf.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/MySqlIncDataSourceSourceConf.java new file mode 100644 index 0000000000..f8c825e963 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/datasource/MySqlIncDataSourceSourceConf.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.datasource; + +import org.apache.eventmesh.common.config.connector.rdb.canal.SourceConnectorConfig; +import org.apache.eventmesh.common.remote.job.SyncConsistency; +import org.apache.eventmesh.common.remote.job.SyncMode; +import org.apache.eventmesh.common.remote.offset.RecordPosition; + +import java.util.List; + +public class MySqlIncDataSourceSourceConf extends DataSourceConf { + @Override + public Class getConfClass() { + return MySqlIncDataSourceSourceConf.class; + } + + private String destination; + + private Long canalInstanceId; + + private String desc; + + private boolean ddlSync = true; + + private boolean filterTableError = false; + + private Long slaveId; + + private Short clientId; + + private String serverUUID; + + private boolean isMariaDB = true; + + private boolean isGTIDMode = true; + + private Integer batchSize = 10000; + + private Long batchTimeout = -1L; + + private String tableFilter; + + private String fieldFilter; + + private List recordPositions; + + // ================================= channel parameter + // ================================ + + // enable remedy + private Boolean enableRemedy = false; + + // sync mode: field/row + private SyncMode syncMode; + + // sync consistency + private SyncConsistency syncConsistency; + + // ================================= system parameter + // ================================ + + // Column name of the bidirectional synchronization mark + private String needSyncMarkTableColumnName = "needSync"; + + // Column value of the bidirectional synchronization mark + private String needSyncMarkTableColumnValue = "needSync"; + + private SourceConnectorConfig sourceConnectorConfig; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobConnectorConfig.java index 91f0888fcd..14e8178cf3 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobConnectorConfig.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobConnectorConfig.java @@ -17,10 +17,10 @@ package org.apache.eventmesh.common.remote.job; -import lombok.Data; - import java.util.Map; +import lombok.Data; + /** * Description: */ diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/BaseRemoteRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/BaseRemoteRequest.java index 3eba07836a..b8c4c06207 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/BaseRemoteRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/BaseRemoteRequest.java @@ -27,7 +27,7 @@ @Getter public abstract class BaseRemoteRequest implements IPayload { - private Map header = new HashMap<>(); + private final Map header = new HashMap<>(); public void addHeader(String key, String value) { if (key == null || value == null) { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateTaskReq.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java similarity index 65% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateTaskReq.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java index b2c799e69a..4ecf9b4527 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateTaskReq.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateOrUpdateDataSourceReq.java @@ -17,21 +17,22 @@ package org.apache.eventmesh.common.remote.request; -import lombok.Data; -import org.apache.eventmesh.common.remote.job.JobConnectorConfig; -import org.apache.eventmesh.common.remote.task.TaskState; +import org.apache.eventmesh.common.remote.datasource.DataSource; +import org.apache.eventmesh.common.remote.datasource.DataSourceType; -import java.util.List; +import lombok.Data; +import lombok.EqualsAndHashCode; /** - * Description: create Task without task id, otherwise update task + * create or update datasource with custom data source config */ @Data -public class CreateOrUpdateTaskReq { - private String taskID; - private String name; +@EqualsAndHashCode(callSuper = true) +public class CreateOrUpdateDataSourceReq extends BaseRemoteRequest { + private Integer id; + private DataSourceType type; private String desc; - private TaskState state; - private String uid; - private List job; + private DataSource config; + private String region; + private String operator; } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java new file mode 100644 index 0000000000..ce24e03416 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.request; + +import org.apache.eventmesh.common.remote.TransportType; +import org.apache.eventmesh.common.remote.datasource.DataSource; +import org.apache.eventmesh.common.remote.job.JobType; + +import java.util.List; + +import lombok.Data; + +/** + * Description: create task without task id, otherwise update task + */ +@Data +public class CreateTaskRequest { + private String name; + private String desc; + private String uid; + private List jobs; + private String region; + + @Data + public static class JobDetail { + private String desc; + + private JobType jobType; + + private DataSource sourceDataSource; + + private String sourceConnectorDesc; + + private DataSource sinkDataSource; + + private String sinkConnectorDesc; + + private TransportType transportType; + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java index 476d977951..42694d5675 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java @@ -17,7 +17,7 @@ package org.apache.eventmesh.common.remote.request; -import org.apache.eventmesh.common.remote.job.JobState; +import org.apache.eventmesh.common.remote.TaskState; import org.apache.eventmesh.common.remote.datasource.DataSourceType; import org.apache.eventmesh.common.remote.offset.RecordPosition; @@ -25,16 +25,18 @@ import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.ToString; @Data @EqualsAndHashCode(callSuper = true) +@ToString public class ReportPositionRequest extends BaseRemoteRequest { private String jobID; private List recordPositionList; - private JobState state; + private TaskState state; private String address; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java index 87f4581eb5..cd541949f4 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java @@ -19,9 +19,11 @@ import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.ToString; @Data @EqualsAndHashCode(callSuper = true) +@ToString public class ReportVerifyRequest extends BaseRemoteRequest { private String taskID; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseRemoteResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseRemoteResponse.java index b6f5daa565..3ea8401535 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseRemoteResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseRemoteResponse.java @@ -27,8 +27,6 @@ @Getter public abstract class BaseRemoteResponse implements IPayload { - - public static final int UNKNOWN = -1; @Setter private boolean success = true; @Setter diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/EmptyAckResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/CreateTaskResponse.java similarity index 88% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/EmptyAckResponse.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/CreateTaskResponse.java index e51091fe94..a6f5628d6f 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/EmptyAckResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/CreateTaskResponse.java @@ -17,9 +17,5 @@ package org.apache.eventmesh.common.remote.response; -/** - * empty, just mean remote received request - */ -public class EmptyAckResponse extends BaseRemoteResponse { - +public class CreateTaskResponse extends BaseRemoteResponse { } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java index 49948dbee7..95d2d157e0 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java @@ -17,23 +17,23 @@ package org.apache.eventmesh.common.remote.response; -import lombok.Data; -import lombok.EqualsAndHashCode; +import org.apache.eventmesh.common.remote.TaskState; +import org.apache.eventmesh.common.remote.TransportType; import org.apache.eventmesh.common.remote.exception.ErrorCode; import org.apache.eventmesh.common.remote.job.JobConnectorConfig; -import org.apache.eventmesh.common.remote.job.JobState; +import org.apache.eventmesh.common.remote.job.JobType; import org.apache.eventmesh.common.remote.offset.RecordPosition; -import org.apache.eventmesh.common.remote.task.TransportType; import java.util.List; +import lombok.Data; +import lombok.EqualsAndHashCode; + @Data @EqualsAndHashCode(callSuper = true) public class FetchJobResponse extends BaseRemoteResponse { - private Integer id; - - private String name; + private String id; private TransportType transportType; @@ -41,7 +41,9 @@ public class FetchJobResponse extends BaseRemoteResponse { private List position; - private JobState state; + private TaskState state; + + private JobType type; public static FetchJobResponse successResponse() { FetchJobResponse response = new FetchJobResponse(); diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FailResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/SimpleResponse.java similarity index 67% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FailResponse.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/SimpleResponse.java index d1d01dc59c..a4cdd52f99 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FailResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/SimpleResponse.java @@ -17,9 +17,18 @@ package org.apache.eventmesh.common.remote.response; -public class FailResponse extends BaseRemoteResponse { - public static FailResponse build(int errorCode, String msg) { - FailResponse response = new FailResponse(); +import org.apache.eventmesh.common.remote.exception.ErrorCode; + +public class SimpleResponse extends BaseRemoteResponse { + /** + * just mean remote received or process success + */ + public static SimpleResponse success() { + return new SimpleResponse(); + } + + public static SimpleResponse fail(int errorCode, String msg) { + SimpleResponse response = new SimpleResponse(); response.setErrorCode(errorCode); response.setDesc(msg); response.setSuccess(false); @@ -33,7 +42,7 @@ public static FailResponse build(int errorCode, String msg) { * @param exception exception * @return response */ - public static FailResponse build(Throwable exception) { - return build(BaseRemoteResponse.UNKNOWN, exception.getMessage()); + public static SimpleResponse fail(Throwable exception) { + return fail(ErrorCode.INTERNAL_ERR, exception.getMessage()); } } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/task/TaskState.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/task/TaskState.java deleted file mode 100644 index 6a37c4ea3e..0000000000 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/task/TaskState.java +++ /dev/null @@ -1,32 +0,0 @@ -package org.apache.eventmesh.common.remote.task; - -import java.util.HashMap; -import java.util.Map; - -public enum TaskState { - CREATE,DELETE,PAUSE,COMPLETE; - private static final TaskState[] STATES_NUM_INDEX = TaskState.values(); - private static final Map STATES_NAME_INDEX = new HashMap<>(); - static { - - for (TaskState taskState : STATES_NUM_INDEX) { - STATES_NAME_INDEX.put(taskState.name(), taskState); - } - } - - public static TaskState fromIndex(Integer index) { - if (index == null || index < 0 || index > STATES_NUM_INDEX.length) { - return null; - } - - return STATES_NUM_INDEX[index]; - } - - public static TaskState fromIndex(String index) { - if (index == null || index.isEmpty()) { - return null; - } - - return STATES_NAME_INDEX.get(index); - } -} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java index 7fa762d67b..bf91957032 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java @@ -62,6 +62,14 @@ public static T mapToObject(Map map, Class beanClass) { return beanClass.cast(obj); } + public static Map objectToMap(Object obj) { + if (obj == null) { + return null; + } + return OBJECT_MAPPER.convertValue(obj, new TypeReference>() { + }); + } + /** * Serialize object to json string. * diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java index 5f23ed31c8..08270fc024 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java @@ -23,7 +23,7 @@ import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceStub; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; -import org.apache.eventmesh.common.remote.job.JobState; +import org.apache.eventmesh.common.remote.TaskState; import org.apache.eventmesh.common.remote.datasource.DataSourceType; import org.apache.eventmesh.common.remote.offset.RecordOffset; import org.apache.eventmesh.common.remote.offset.RecordPartition; @@ -71,7 +71,7 @@ public class AdminOffsetService implements OffsetManagementService { private String jobId; - private JobState jobState; + private TaskState jobState; private DataSourceType dataSourceType; @@ -271,7 +271,7 @@ public void onCompleted() { log.info("init record offset {}", initialRecordOffsetMap); positionStore.putAll(initialRecordOffsetMap); } - this.jobState = JobState.RUNNING; + this.jobState = TaskState.RUNNING; this.jobId = offsetStorageConfig.getExtensions().get("jobId"); } } diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java index e7bbbdbd48..0335a09568 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java @@ -17,13 +17,6 @@ package org.apache.eventmesh.runtime.connector; -import com.google.protobuf.Any; -import com.google.protobuf.UnsafeByteOperations; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.stub.StreamObserver; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; import org.apache.eventmesh.api.consumer.Consumer; import org.apache.eventmesh.api.factory.StoragePluginFactory; import org.apache.eventmesh.api.producer.Producer; @@ -70,7 +63,6 @@ import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.*; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException;