diff --git a/eventmesh-admin-server/bin/stop-admin.sh b/eventmesh-admin-server/bin/stop-admin.sh new file mode 100644 index 0000000000..207531d7fa --- /dev/null +++ b/eventmesh-admin-server/bin/stop-admin.sh @@ -0,0 +1,88 @@ +#!/bin/bash +# +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Detect operating system +OS=$(uname) + +EVENTMESH_ADMIN_HOME=`cd $(dirname $0)/.. && pwd` + +export EVENTMESH_ADMIN_HOME + +function get_pid { + local ppid="" + if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then + ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file) + # If the process does not exist, it indicates that the previous process terminated abnormally. + if [ ! -d /proc/$ppid ]; then + # Remove the residual file and return an error status. + rm ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file + echo -e "ERROR\t EventMesh admin process had already terminated unexpectedly before, please check log output." + ppid="" + fi + else + if [[ $OS =~ Msys ]]; then + # There is a Bug on Msys that may not be able to kill the identified process + ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}` + elif [[ $OS =~ Darwin ]]; then + # Known problem: grep Java may not be able to accurately identify Java processes + ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + else + # It is required to identify the process as accurately as possible on Linux + ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + fi + fi + echo "$ppid"; +} + +pid=$(get_pid) +if [[ $pid == "ERROR"* ]]; then + echo -e "${pid}" + exit 9 +fi +if [ -z "$pid" ];then + echo -e "ERROR\t No EventMesh admin server running." + exit 9 +fi + +kill ${pid} +echo "Send shutdown request to EventMesh admin(${pid}) OK" + +[[ $OS =~ Msys ]] && PS_PARAM=" -W " +stop_timeout=60 +for no in $(seq 1 $stop_timeout); do + if ps $PS_PARAM -p "$pid" 2>&1 > /dev/null; then + if [ $no -lt $stop_timeout ]; then + echo "[$no] server shutting down ..." + sleep 1 + continue + fi + + echo "shutdown server timeout, kill process: $pid" + kill -9 $pid; sleep 1; break; + echo "`date +'%Y-%m-%-d %H:%M:%S'` , pid : [$pid] , error message : abnormal shutdown which can not be closed within 60s" > ../logs/shutdown.error + else + echo "shutdown server ok!"; break; + fi +done + +if [ -f "pid-admin.file" ]; then + rm pid-admin.file +fi + + diff --git a/eventmesh-admin-server/conf/application.yaml b/eventmesh-admin-server/conf/application.yaml index 3d702e579e..7765d90ce8 100644 --- a/eventmesh-admin-server/conf/application.yaml +++ b/eventmesh-admin-server/conf/application.yaml @@ -21,6 +21,24 @@ spring: username: //db_username password: //db_password driver-class-name: com.mysql.cj.jdbc.Driver + initialSize: 1 + minIdle: 1 + maxActive: 20 + maxWait: 10000 + timeBetweenEvictionRunsMillis: 60000 + minEvictableIdleTimeMillis: 300000 + validationQuery: SELECT 1 FROM DUAL + testWhileIdle: true + testOnBorrow: false + testOnReturn: false + poolPreparedStatements: false + maxPoolPreparedStatementPerConnectionSize: 20 + filters: stat + connectionProperties: "druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000" +# secret keys +sysPubKey: +appPrivKey: + mybatis-plus: mapper-locations: classpath:mapper/*.xml configuration: @@ -35,8 +53,6 @@ event-mesh: # grpc server port port: 8081 adminServerList: - R1: - - http://localhost:8082 - R2: - - http://localhost:8082 + R1: http://localhost:8082;http://localhost:8082 + R2: http://localhost:8092;http://localhost:8092 region: R1 \ No newline at end of file diff --git a/eventmesh-admin-server/conf/eventmesh.sql b/eventmesh-admin-server/conf/eventmesh.sql index 6e28daca8a..4d11ab1585 100644 --- a/eventmesh-admin-server/conf/eventmesh.sql +++ b/eventmesh-admin-server/conf/eventmesh.sql @@ -146,6 +146,39 @@ CREATE TABLE IF NOT EXISTS `event_mesh_verify` ( PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; +-- eventmesh.event_mesh_weredis_position definition +CREATE TABLE `event_mesh_weredis_position` ( + `id` int(10) unsigned NOT NULL AUTO_INCREMENT, + `jobID` varchar(50) COLLATE utf8_bin NOT NULL DEFAULT '', + `address` varchar(50) COLLATE utf8_bin DEFAULT NULL, + `clusterName` varchar(50) COLLATE utf8_bin DEFAULT NULL, + `partitionName` varchar(50) COLLATE utf8_bin DEFAULT NULL, + `masterReplid` varchar(50) COLLATE utf8_bin DEFAULT NULL, + `host` varchar(50) COLLATE utf8_bin DEFAULT NULL, + `replOffset` bigint(20) NOT NULL DEFAULT '-1', + `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + UNIQUE KEY `jobID` (`jobID`) +) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC; + + +CREATE TABLE `event_mesh_monitor` ( + `id` int(10) unsigned NOT NULL AUTO_INCREMENT, + `taskID` varchar(50) COLLATE utf8_bin DEFAULT NULL, + `jobID` varchar(50) COLLATE utf8_bin DEFAULT NULL, + `address` varchar(50) COLLATE utf8_bin DEFAULT NULL, + `transportType` varchar(50) COLLATE utf8_bin DEFAULT NULL, + `connectorStage` varchar(50) COLLATE utf8_bin DEFAULT NULL, + `totalReqNum` bigint DEFAULT NULL, + `totalTimeCost` bigint DEFAULT NULL, + `maxTimeCost` bigint DEFAULT NULL, + `avgTimeCost` bigint DEFAULT NULL, + `tps` double DEFAULT NULL, + `createTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`id`) +) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin; + /*!40101 SET SQL_MODE=IFNULL(@OLD_SQL_MODE, '') */; /*!40014 SET FOREIGN_KEY_CHECKS=IFNULL(@OLD_FOREIGN_KEY_CHECKS, 1) */; /*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */; diff --git a/eventmesh-admin-server/conf/log4j2.xml b/eventmesh-admin-server/conf/log4j2.xml index 6341a0e629..acc6acb8ba 100644 --- a/eventmesh-admin-server/conf/log4j2.xml +++ b/eventmesh-admin-server/conf/log4j2.xml @@ -28,74 +28,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/eventmesh-admin-server/conf/mapper/EventMeshMonitorMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshMonitorMapper.xml new file mode 100644 index 0000000000..f77fb8ba77 --- /dev/null +++ b/eventmesh-admin-server/conf/mapper/EventMeshMonitorMapper.xml @@ -0,0 +1,46 @@ + + + + + + + + + + + + + + + + + + + + + + + + id,taskID,jobID,address,transportType,connectorStage, + totalReqNum,totalTimeCost,maxTimeCost,avgTimeCost, + tps,createTime + + diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java index 612d398078..2e6d3c018a 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServerProperties.java @@ -17,7 +17,6 @@ package org.apache.eventmesh.admin.server; -import java.util.List; import java.util.Map; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -35,6 +34,6 @@ public class AdminServerProperties { private String configurationPath; private String configurationFile; private String serviceName; - private Map> adminServerList; + private Map adminServerList; private String region; } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ExampleAdminServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ExampleAdminServer.java index b179a790c5..d5c52f58bc 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ExampleAdminServer.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ExampleAdminServer.java @@ -17,17 +17,22 @@ package org.apache.eventmesh.admin.server; -import org.apache.eventmesh.admin.server.constatns.AdminServerConstants; +import org.apache.eventmesh.admin.server.constants.AdminServerConstants; import org.apache.eventmesh.common.config.ConfigService; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; -@SpringBootApplication(scanBasePackages = "org.apache.eventmesh.admin.server") +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@SpringBootApplication(scanBasePackages = "org.apache.eventmesh.admin.server", exclude = {DataSourceAutoConfiguration.class}) public class ExampleAdminServer { public static void main(String[] args) throws Exception { ConfigService.getInstance().setConfigPath(AdminServerConstants.EVENTMESH_CONF_HOME).setRootConfig(AdminServerConstants.EVENTMESH_CONF_FILE); SpringApplication.run(ExampleAdminServer.class); + log.info("wedts-admin start success."); } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/constatns/AdminServerConstants.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/constants/AdminServerConstants.java similarity index 95% rename from eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/constatns/AdminServerConstants.java rename to eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/constants/AdminServerConstants.java index 44afaca1c2..8ed079fd31 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/constatns/AdminServerConstants.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/constants/AdminServerConstants.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.eventmesh.admin.server.constatns; +package org.apache.eventmesh.admin.server.constants; public class AdminServerConstants { public static final String CONF_ENV = "configurationPath"; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java index 2454e9f02c..0a20d8645e 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java @@ -17,15 +17,28 @@ package org.apache.eventmesh.admin.server.web; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshTaskInfoService; +import org.apache.eventmesh.admin.server.web.service.monitor.MonitorBizService; import org.apache.eventmesh.admin.server.web.service.task.TaskBizService; import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService; import org.apache.eventmesh.common.remote.request.CreateTaskRequest; +import org.apache.eventmesh.common.remote.request.QueryTaskInfoRequest; +import org.apache.eventmesh.common.remote.request.QueryTaskMonitorRequest; +import org.apache.eventmesh.common.remote.request.ReportMonitorRequest; import org.apache.eventmesh.common.remote.request.ReportVerifyRequest; +import org.apache.eventmesh.common.remote.request.TaskBachRequest; +import org.apache.eventmesh.common.remote.request.TaskIDRequest; import org.apache.eventmesh.common.remote.response.CreateTaskResponse; +import org.apache.eventmesh.common.remote.response.HttpResponseResult; +import org.apache.eventmesh.common.remote.response.QueryTaskInfoResponse; +import org.apache.eventmesh.common.remote.response.QueryTaskMonitorResponse; +import org.apache.eventmesh.common.remote.response.SimpleResponse; import org.apache.eventmesh.common.utils.JsonUtils; +import java.util.ArrayList; +import java.util.List; + import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -44,30 +57,127 @@ public class HttpServer { @Autowired private VerifyBizService verifyService; + @Autowired + private MonitorBizService monitorService; + + @Autowired + private EventMeshTaskInfoService taskInfoService; + @RequestMapping(value = "/createTask", method = RequestMethod.POST) - public ResponseEntity createOrUpdateTask(@RequestBody CreateTaskRequest task) { + public String createOrUpdateTask(@RequestBody CreateTaskRequest task) { log.info("receive http proto create task:{}", task); CreateTaskResponse createTaskResponse = taskService.createTask(task); log.info("receive http proto create task result:{}", createTaskResponse); - return ResponseEntity.ok(JsonUtils.toJSONString(Response.success(createTaskResponse))); + SimpleResponse simpleResponse = new SimpleResponse(); + simpleResponse.setData(createTaskResponse); + return JsonUtils.toJSONString(simpleResponse); } @RequestMapping(value = "/reportVerify", method = RequestMethod.POST) - public ResponseEntity reportVerify(@RequestBody ReportVerifyRequest request) { + public String reportVerify(@RequestBody ReportVerifyRequest request) { log.info("receive http proto report verify request:{}", request); boolean result = verifyService.reportVerifyRecord(request); log.info("receive http proto report verify result:{}", result); + SimpleResponse simpleResponse = new SimpleResponse(); + simpleResponse.setData(result); + return JsonUtils.toJSONString(simpleResponse); + } + + @RequestMapping(value = "/reportMonitor", method = RequestMethod.POST) + public String reportMonitor(@RequestBody ReportMonitorRequest request) { + log.info("receive http proto report monitor request:{}", request); + boolean result = monitorService.reportMonitorRecord(request); + log.info("receive http proto report monitor result:{}", result); + SimpleResponse simpleResponse = new SimpleResponse(); + simpleResponse.setData(result); + return JsonUtils.toJSONString(simpleResponse); + } + + @RequestMapping(value = "/queryTaskMonitor", method = RequestMethod.POST) + public String queryTaskMonitor(@RequestBody QueryTaskMonitorRequest request) { + log.info("receive http proto query task monitor request:{}", request); + QueryTaskMonitorResponse result = monitorService.queryTaskMonitors(request); + log.info("receive http proto query task monitor result:{}", result); + SimpleResponse simpleResponse = new SimpleResponse(); + simpleResponse.setData(result); + return JsonUtils.toJSONString(simpleResponse); + } + + @RequestMapping(value = "/queryTaskInfo", method = RequestMethod.POST) + public HttpResponseResult queryTaskInfo(@RequestBody QueryTaskInfoRequest taskInfoRequest) { + log.info("receive http query task info request:{}", taskInfoRequest); + List taskInfosResponse = taskService.queryTaskInfo(taskInfoRequest); + log.info("receive http query task info taskInfosResponse:{}", taskInfoRequest); + if (taskInfosResponse.isEmpty()) { + return HttpResponseResult.failed("NOT FOUND"); + } + return HttpResponseResult.success(taskInfosResponse); + } + + @RequestMapping(value = "/deleteTask", method = RequestMethod.DELETE) + public HttpResponseResult deleteTask(@RequestBody TaskIDRequest taskIDRequest) { + log.info("receive need to delete taskID:{}", taskIDRequest.getTaskID()); + boolean result = taskService.deleteTaskByTaskID(taskIDRequest); if (result) { - return ResponseEntity.ok("report verify success.request:" + JsonUtils.toJSONString(request)); + return HttpResponseResult.success(); } else { - return ResponseEntity.internalServerError().body("report verify success.request:" + JsonUtils.toJSONString(request)); + return HttpResponseResult.failed(); } } - public boolean deleteTask(Long id) { - return false; + @RequestMapping(value = "/startTask", method = RequestMethod.POST) + public HttpResponseResult startTask(@RequestBody TaskIDRequest taskIDRequest) { + log.info("receive start task ID:{}", taskIDRequest.getTaskID()); + taskService.startTask(taskIDRequest); + return HttpResponseResult.success(); + } + + @RequestMapping(value = "/restartTask", method = RequestMethod.POST) + public HttpResponseResult restartTask(@RequestBody TaskIDRequest taskIDRequest) { + log.info("receive restart task ID:{}", taskIDRequest.getTaskID()); + taskService.restartTask(taskIDRequest); + return HttpResponseResult.success(); } + @RequestMapping(value = "/stopTask", method = RequestMethod.POST) + public HttpResponseResult stopTask(@RequestBody TaskIDRequest taskIDRequest) { + log.info("receive stop task ID:{}", taskIDRequest.getTaskID()); + taskService.stopTask(taskIDRequest); + return HttpResponseResult.success(); + } + + @RequestMapping(value = "/restartBatch", method = RequestMethod.POST) + public HttpResponseResult restartBatch(@RequestBody List taskBachRequestList) { + log.info("receive restart batch task IDs:{}", taskBachRequestList); + List errorNames = new ArrayList<>(); + taskService.restartBatchTask(taskBachRequestList, errorNames); + if (!errorNames.isEmpty()) { + return HttpResponseResult.failed(errorNames); + } + return HttpResponseResult.success(); + } + + @RequestMapping(value = "stopBatch", method = RequestMethod.POST) + public HttpResponseResult stopBatch(@RequestBody List taskBachRequestList) { + log.info("receive stop batch task IDs:{}", taskBachRequestList); + List errorNames = new ArrayList<>(); + taskService.stopBatchTask(taskBachRequestList, errorNames); + if (!errorNames.isEmpty()) { + return HttpResponseResult.failed(errorNames); + } + return HttpResponseResult.success(); + } + + @RequestMapping(value = "/startBatch", method = RequestMethod.POST) + public HttpResponseResult startBatch(@RequestBody List taskBachRequestList) { + log.info("receive start batch task IDs:{}", taskBachRequestList); + List errorNames = new ArrayList<>(); + taskService.startBatchTask(taskBachRequestList, errorNames); + if (!errorNames.isEmpty()) { + return HttpResponseResult.failed(errorNames); + } + return HttpResponseResult.success(); + } -} +} \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/config/MybatisPlusConfig.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/config/MybatisPlusConfig.java new file mode 100644 index 0000000000..15d362bcd0 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/config/MybatisPlusConfig.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.baomidou.mybatisplus.annotation.DbType; +import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor; +import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor; + +@Configuration +public class MybatisPlusConfig { + + @Bean + public MybatisPlusInterceptor paginationInterceptor() { + MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor(); + PaginationInnerInterceptor pageInterceptor = new PaginationInnerInterceptor(DbType.MYSQL); + pageInterceptor.setMaxLimit(500L); + interceptor.addInnerInterceptor(pageInterceptor); + return interceptor; + } + +} \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DruidDataSource.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DruidDataSource.java new file mode 100644 index 0000000000..fb26d44d30 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DruidDataSource.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.db; + + +import org.apache.eventmesh.admin.server.web.utils.EncryptUtil; +import org.apache.eventmesh.admin.server.web.utils.ParamType; + +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.sql.SQLException; + +import javax.sql.DataSource; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; + +import lombok.extern.slf4j.Slf4j; + +@Configuration +@ComponentScan +@Slf4j +public class DruidDataSource { + + @Value("${spring.datasource.url}") + private String dbUrl; + + @Value("${spring.datasource.username}") + private String username; + + @Value("${spring.datasource.password}") + private String password; + + @Value("${spring.datasource.driver-class-name}") + private String driverClassName; + + @Value("${spring.datasource.initialSize}") + private int initialSize; + + @Value("${spring.datasource.minIdle}") + private int minIdle; + + @Value("${spring.datasource.maxActive}") + private int maxActive; + + @Value("${spring.datasource.maxWait}") + private int maxWait; + + @Value("${spring.datasource.timeBetweenEvictionRunsMillis}") + private int timeBetweenEvictionRunsMillis; + + @Value("${spring.datasource.minEvictableIdleTimeMillis}") + private int minEvictableIdleTimeMillis; + + @Value("${spring.datasource.validationQuery}") + private String validationQuery; + + @Value("${spring.datasource.testWhileIdle}") + private boolean testWhileIdle; + + @Value("${spring.datasource.testOnBorrow}") + private boolean testOnBorrow; + + @Value("${spring.datasource.testOnReturn}") + private boolean testOnReturn; + + @Value("${spring.datasource.poolPreparedStatements}") + private boolean poolPreparedStatements; + + @Value("${spring.datasource.maxPoolPreparedStatementPerConnectionSize}") + private int maxPoolPreparedStatementPerConnectionSize; + + @Value("${spring.datasource.filters}") + private String filters; + + @Value("{spring.datasource.connectionProperties}") + private String connectionProperties; + + @Value("${sysPubKey}") + private String sysPubKeyStr; + + @Value("${appPrivKey}") + private String appPrivKeyStr; + + + @Bean + @Primary + public DataSource dataSource() throws Exception { + try (com.alibaba.druid.pool.DruidDataSource datasource = new com.alibaba.druid.pool.DruidDataSource()) { + datasource.setUrl(this.dbUrl); + datasource.setUsername(username); + datasource.setPassword(rsaDecrypt(sysPubKeyStr, appPrivKeyStr, password)); + datasource.setDriverClassName(driverClassName); + datasource.setInitialSize(initialSize); + datasource.setMinIdle(minIdle); + datasource.setMaxActive(maxActive); + datasource.setMaxWait(maxWait); + datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); + datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); + datasource.setValidationQuery(validationQuery); + datasource.setTestWhileIdle(testWhileIdle); + datasource.setTestOnBorrow(testOnBorrow); + datasource.setTestOnReturn(testOnReturn); + datasource.setPoolPreparedStatements(poolPreparedStatements); + datasource.setMaxPoolPreparedStatementPerConnectionSize(maxPoolPreparedStatementPerConnectionSize); + try { + datasource.setFilters(filters); + } catch (SQLException e) { + log.error("druid configuration initialization filter", e); + } + datasource.setConnectionProperties(connectionProperties); + + return datasource; + } + } + + public static String rsaDecrypt(String sysPubKeyStr, String appPrivKeyStr, String encrtyptText) throws IOException { + if (StringUtils.isNotBlank(encrtyptText) && encrtyptText.length() > "{RSA}".length() && encrtyptText.startsWith("{RSA}")) { + String text = encrtyptText.startsWith("{RSA}") ? encrtyptText.substring("{RSA}".length()) : encrtyptText; + + try { + return EncryptUtil.decrypt(ParamType.STRING, sysPubKeyStr, ParamType.STRING, appPrivKeyStr, ParamType.STRING, text); + } catch (Exception e) { + throw new RuntimeException("decrypt error", e); + } + } else { + return encrtyptText; + } + } + +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMonitor.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMonitor.java new file mode 100644 index 0000000000..0507464b5b --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshMonitor.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.db.entity; + +import java.io.Serializable; +import java.util.Date; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; + +import lombok.Data; + +/** + * TableName event_mesh_monitor + */ +@TableName(value = "event_mesh_monitor") +@Data +public class EventMeshMonitor implements Serializable { + + @TableId(type = IdType.AUTO) + private Integer id; + + private String taskID; + private String jobID; + private String address; + private String transportType; + private String connectorStage; + private Long totalReqNum; + private Long totalTimeCost; + private Long maxTimeCost; + private Long avgTimeCost; + private Double tps; + private Date createTime; + + private static final long serialVersionUID = 1L; +} \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshWeredisPosition.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshWeredisPosition.java new file mode 100644 index 0000000000..2117230826 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshWeredisPosition.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.db.entity; + +import java.io.Serializable; +import java.util.Date; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; + +import lombok.Data; + +/** + * TableName event_mesh_weredis_position + */ +@TableName(value = "event_mesh_weredis_position") +@Data +public class EventMeshWeredisPosition implements Serializable { + @TableId(type = IdType.AUTO) + private Integer id; + + private String jobID; + + // connection run address + private String address; + + private String clusterName; + + private String partitionName; + + private String masterReplid; + + //weredis run host + private String host; + + private Long replOffset = -1L; + + private Date createTime; + + private Date updateTime; + + private static final long serialVersionUID = 1L; +} \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshMonitorMapper.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshMonitorMapper.java new file mode 100644 index 0000000000..db77224637 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/mapper/EventMeshMonitorMapper.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.db.mapper; + +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshMonitor; + +import org.apache.ibatis.annotations.Mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +/** + * event_mesh_monitor + * Entity org.apache.eventmesh.admin.server.web.db.entity.EventMeshMonitor + */ +@Mapper +public interface EventMeshMonitorMapper extends BaseMapper { + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshMonitorService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshMonitorService.java new file mode 100644 index 0000000000..4180f82a97 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshMonitorService.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.db.service; + +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshMonitor; + +import com.baomidou.mybatisplus.extension.service.IService; + +/** + * event_mesh_monitor + */ +public interface EventMeshMonitorService extends IService { + +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshTaskInfoService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshTaskInfoService.java index dc35cfe071..04da6a7952 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshTaskInfoService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/EventMeshTaskInfoService.java @@ -18,6 +18,10 @@ package org.apache.eventmesh.admin.server.web.db.service; import org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo; +import org.apache.eventmesh.common.remote.request.QueryTaskInfoRequest; +import org.apache.eventmesh.common.remote.response.QueryTaskInfoResponse; + +import java.util.List; import com.baomidou.mybatisplus.extension.service.IService; @@ -26,4 +30,8 @@ */ public interface EventMeshTaskInfoService extends IService { + List queryTaskInfo(QueryTaskInfoRequest taskInfoRequest); + + // boolean deleteTaskByTaskID(String taskID); + } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMonitorServiceImpl.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMonitorServiceImpl.java new file mode 100644 index 0000000000..ebb4220000 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMonitorServiceImpl.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.db.service.impl; + +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshMonitor; +import org.apache.eventmesh.admin.server.web.db.mapper.EventMeshMonitorMapper; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshMonitorService; + +import org.springframework.stereotype.Service; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; + +/** + * event_mesh_monitor + */ +@Service +public class EventMeshMonitorServiceImpl extends ServiceImpl + implements EventMeshMonitorService { + +} + + + + diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshTaskInfoServiceImpl.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshTaskInfoServiceImpl.java index 9568b63671..91acb51a76 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshTaskInfoServiceImpl.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/service/impl/EventMeshTaskInfoServiceImpl.java @@ -17,23 +17,307 @@ package org.apache.eventmesh.admin.server.web.db.service.impl; +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition; import org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo; import org.apache.eventmesh.admin.server.web.db.mapper.EventMeshTaskInfoMapper; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshMysqlPositionService; import org.apache.eventmesh.admin.server.web.db.service.EventMeshTaskInfoService; +import org.apache.eventmesh.common.remote.JobState; +import org.apache.eventmesh.common.remote.TaskState; +import org.apache.eventmesh.common.remote.request.QueryTaskInfoRequest; +import org.apache.eventmesh.common.remote.response.QueryTaskInfoResponse; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.List; + +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; + /** * event_mesh_task_info */ +@Slf4j @Service public class EventMeshTaskInfoServiceImpl extends ServiceImpl - implements EventMeshTaskInfoService { + implements EventMeshTaskInfoService { + + @Autowired + private EventMeshTaskInfoMapper taskInfoMapper; + + @Autowired + private EventMeshJobInfoService jobInfoService; + + @Autowired + private EventMeshDataSourceService dataSourceService; + + @Autowired + private EventMeshMysqlPositionService mysqlPositionService; + + @Override + public List queryTaskInfo(QueryTaskInfoRequest taskInfoRequest) { + + log.info("receive query task info request:{}", taskInfoRequest); + + List queryTaskInfoResponseList = new ArrayList<>(); + + Integer currentPage = taskInfoRequest.getCurrentPage(); + Integer pageSize = taskInfoRequest.getPageSize(); + + // query by page + if (StringUtils.isEmpty(taskInfoRequest.getTaskID()) + && currentPage != null + && pageSize != null) { + + Page page = new Page<>(); + page.setCurrent(currentPage); + page.setSize(pageSize); + List eventMeshTaskInfoList = taskInfoMapper.selectPage(page, Wrappers.query() + .ne("taskState", TaskState.DELETE.name())).getRecords(); + queryTaskInfoResponseList = getQueryTaskInfoResponses(eventMeshTaskInfoList, queryTaskInfoResponseList); + + } + + if (StringUtils.isNotEmpty(taskInfoRequest.getTaskID()) || StringUtils.isNotEmpty(taskInfoRequest.getTaskID())) { + queryTaskInfoResponseList = eventMeshTaskInfoList(taskInfoRequest); + } + + // if (StringUtils.isNotEmpty(taskInfoRequest.getJobType())) { + // + // } + // + // if (StringUtils.isNotEmpty(taskInfoRequest.getSourceDataID())) { + // + // } + // + // if (StringUtils.isNotEmpty(taskInfoRequest.getTargetDataID())) { + // + // } + // + // if (StringUtils.isNotEmpty(taskInfoRequest.getIp())) { + // + // } + // + // if (StringUtils.isNotEmpty(taskInfoRequest.getSourceTableName())) { + // + // } + // + // if (StringUtils.isNotEmpty(taskInfoRequest.getTaskMathID())) { + // + // } + + log.info("query event mesh task info response result:{}", queryTaskInfoResponseList); + + return queryTaskInfoResponseList; + } + + @Transactional + private List eventMeshTaskInfoList(QueryTaskInfoRequest taskInfoRequest) { + + List eventMeshTaskInfoList = new ArrayList<>(); + + Page page = new Page<>(); + page.setCurrent(taskInfoRequest.getCurrentPage()); + page.setSize(taskInfoRequest.getPageSize()); + + if (StringUtils.isNotEmpty(taskInfoRequest.getTaskID())) { + eventMeshTaskInfoList = taskInfoMapper.selectPage(page, Wrappers.query() + .eq("taskID", taskInfoRequest.getTaskID()) + .ne("taskState", TaskState.DELETE.name())) + .getRecords(); + } + + if (StringUtils.isNotEmpty(taskInfoRequest.getTaskDesc())) { + eventMeshTaskInfoList = taskInfoMapper.selectPage(page, Wrappers.query() + .like("taskDesc", taskInfoRequest.getTaskDesc()) + .ne("jobState", JobState.DELETE.name())) + .getRecords(); + } + + List eventMeshTaskInfos = new ArrayList<>(); + + List queryTaskInfoResponse = getQueryTaskInfoResponses(eventMeshTaskInfoList, eventMeshTaskInfos); + log.info("query task info result queryTaskInfoResponse:{}", queryTaskInfoResponse); + + return queryTaskInfoResponse; + } + + private List getQueryTaskInfoResponses(List eventMeshTaskInfoList, + List eventMeshTaskInfos) { + + for (EventMeshTaskInfo meshTaskInfo : eventMeshTaskInfoList) { + QueryTaskInfoResponse eventMeshTaskInfo = initEventMeshTaskInfo(meshTaskInfo); + eventMeshTaskInfos.add(eventMeshTaskInfo); + } + + if (!eventMeshTaskInfoList.isEmpty()) { + List eventMeshJobInfoList = new ArrayList<>(); + for (QueryTaskInfoResponse eventMeshTaskInfo : eventMeshTaskInfos) { + List eventMeshJobInfos = jobInfoService.list(Wrappers.query() + .eq("taskID", eventMeshTaskInfo.getTaskID()) + .ne("jobState", JobState.DELETE.name())); + + for (EventMeshJobInfo eventMeshJobInfo : eventMeshJobInfos) { + QueryTaskInfoResponse.EventMeshJobInfo eventMeshJobInfoCovert = initEventMeshJobInfo(eventMeshJobInfo); + eventMeshJobInfoList.add(eventMeshJobInfoCovert); + } + + if (!eventMeshJobInfoList.isEmpty()) { + for (QueryTaskInfoResponse.EventMeshJobInfo eventMeshJobInfo : eventMeshJobInfoList) { + QueryTaskInfoResponse.EventMeshDataSource dataSource = covertEventMeshDataSource( + querySourceOrSinkData(eventMeshJobInfo.getSourceData())); + QueryTaskInfoResponse.EventMeshDataSource dataSink = covertEventMeshDataSource( + querySourceOrSinkData(eventMeshJobInfo.getTargetData())); + + EventMeshMysqlPosition eventMeshMysqlPosition = mysqlPositionService.getOne(Wrappers.query().eq( + "jobID", + eventMeshJobInfo.getJobID() + )); + + + QueryTaskInfoResponse.EventMeshMysqlPosition mysqlPosition = covertEventMeshMysqlPosition(eventMeshMysqlPosition); + + eventMeshJobInfo.setEventMeshMysqlPosition(mysqlPosition); + eventMeshJobInfo.setDataSource(dataSource); + eventMeshJobInfo.setDataSink(dataSink); + } + } + + // set job info to same taskID + eventMeshTaskInfo.setEventMeshJobInfoList(eventMeshJobInfoList); + } + } + + List queryTaskInfoResponse = new ArrayList<>(); + if (!eventMeshTaskInfos.isEmpty()) { + queryTaskInfoResponse.addAll(eventMeshTaskInfos); + } + + return queryTaskInfoResponse; + } + + /** + * QueryTaskInfoResponse.EventMeshDataSource covert + * + * @param eventMeshData EventMeshDataSource + * @return meshData + */ + private static QueryTaskInfoResponse.EventMeshDataSource covertEventMeshDataSource(EventMeshDataSource eventMeshData) { + QueryTaskInfoResponse.EventMeshDataSource meshData = new QueryTaskInfoResponse.EventMeshDataSource(); + if (ObjectUtils.isEmpty(eventMeshData)) { + return null; + } + meshData.setId(eventMeshData.getId()); + meshData.setDataType(eventMeshData.getDataType()); + meshData.setConfiguration(eventMeshData.getConfiguration()); + meshData.setConfigurationClass(eventMeshData.getConfigurationClass()); + meshData.setDescription(eventMeshData.getDescription()); + meshData.setRegion(eventMeshData.getRegion()); + meshData.setCreateUid(eventMeshData.getCreateUid()); + meshData.setUpdateUid(eventMeshData.getUpdateUid()); + meshData.setCreateTime(eventMeshData.getCreateTime()); + meshData.setUpdateTime(eventMeshData.getUpdateTime()); + return meshData; + } -} + /** + * getSourceOrSinkData + * + * @param id id + * @return EventMeshDataSource + */ + private EventMeshDataSource querySourceOrSinkData(Integer id) { + return dataSourceService.getOne(Wrappers.query().eq( + "id", + id)); + } + /** + * QueryTaskInfoResponse.EventMeshMysqlPosition + * + * @param mysqlPosition EventMeshMysqlPosition + * @return position + */ + private static QueryTaskInfoResponse.EventMeshMysqlPosition covertEventMeshMysqlPosition(EventMeshMysqlPosition mysqlPosition) { + QueryTaskInfoResponse.EventMeshMysqlPosition position = new QueryTaskInfoResponse.EventMeshMysqlPosition(); + if (ObjectUtils.isEmpty(mysqlPosition)) { + return null; + } + position.setId(mysqlPosition.getId()); + position.setJobID(mysqlPosition.getJobID()); + position.setServerUUID(mysqlPosition.getServerUUID()); + position.setAddress(mysqlPosition.getAddress()); + position.setPosition(mysqlPosition.getPosition()); + position.setGtid(mysqlPosition.getGtid()); + position.setCurrentGtid(mysqlPosition.getCurrentGtid()); + position.setTimestamp(mysqlPosition.getTimestamp()); + position.setJournalName(mysqlPosition.getJournalName()); + position.setCreateTime(mysqlPosition.getCreateTime()); + position.setUpdateTime(mysqlPosition.getUpdateTime()); + return position; + } + /** + * EventMeshJobInfo covert + * + * @param eventMeshJobInfo EventMeshJobInfo + * @return QueryTaskInfoResponse.EventMeshJobInfo + */ + private static QueryTaskInfoResponse.EventMeshJobInfo initEventMeshJobInfo(EventMeshJobInfo eventMeshJobInfo) { + QueryTaskInfoResponse.EventMeshJobInfo eventMeshJobInfoCovert = new QueryTaskInfoResponse.EventMeshJobInfo(); + if (ObjectUtils.isEmpty(eventMeshJobInfo)) { + return null; + } + eventMeshJobInfoCovert.setId(eventMeshJobInfo.getId()); + eventMeshJobInfoCovert.setJobID(eventMeshJobInfo.getJobID()); + eventMeshJobInfoCovert.setJobDesc(eventMeshJobInfo.getJobDesc()); + eventMeshJobInfoCovert.setTaskID(eventMeshJobInfo.getTaskID()); + eventMeshJobInfoCovert.setTransportType(eventMeshJobInfo.getTransportType()); + eventMeshJobInfoCovert.setSourceData(eventMeshJobInfo.getSourceData()); + eventMeshJobInfoCovert.setTargetData(eventMeshJobInfo.getTargetData()); + eventMeshJobInfoCovert.setJobState(eventMeshJobInfo.getJobState()); + eventMeshJobInfoCovert.setJobType(eventMeshJobInfo.getJobType()); + eventMeshJobInfoCovert.setFromRegion(eventMeshJobInfo.getFromRegion()); + eventMeshJobInfoCovert.setRunningRegion(eventMeshJobInfo.getRunningRegion()); + eventMeshJobInfoCovert.setCreateUid(eventMeshJobInfo.getCreateUid()); + eventMeshJobInfoCovert.setUpdateUid(eventMeshJobInfo.getUpdateUid()); + eventMeshJobInfoCovert.setCreateTime(eventMeshJobInfo.getCreateTime()); + eventMeshJobInfoCovert.setUpdateTime(eventMeshJobInfo.getUpdateTime()); + return eventMeshJobInfoCovert; + } + /** + * EventMeshTaskInfo covert + * + * @param meshTaskInfo EventMeshTaskInfo + * @return QueryTaskInfoResponse + */ + private static QueryTaskInfoResponse initEventMeshTaskInfo(EventMeshTaskInfo meshTaskInfo) { + QueryTaskInfoResponse eventMeshTaskInfo = new QueryTaskInfoResponse(); + eventMeshTaskInfo.setId(meshTaskInfo.getId()); + eventMeshTaskInfo.setTaskID(meshTaskInfo.getTaskID()); + eventMeshTaskInfo.setTaskDesc(meshTaskInfo.getTaskDesc()); + eventMeshTaskInfo.setTaskState(meshTaskInfo.getTaskState()); + eventMeshTaskInfo.setSourceRegion(meshTaskInfo.getSourceRegion()); + eventMeshTaskInfo.setTargetRegion(meshTaskInfo.getTargetRegion()); + eventMeshTaskInfo.setCreateUid(meshTaskInfo.getCreateUid()); + eventMeshTaskInfo.setUpdateUid(meshTaskInfo.getUpdateUid()); + eventMeshTaskInfo.setCreateTime(meshTaskInfo.getCreateTime()); + eventMeshTaskInfo.setUpdateTime(meshTaskInfo.getUpdateTime()); + return eventMeshTaskInfo; + } +} \ No newline at end of file diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java index ea836ce7aa..c876014f63 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java @@ -19,19 +19,29 @@ import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; +import org.apache.eventmesh.admin.server.web.pojo.TaskDetail; import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService; +import org.apache.eventmesh.admin.server.web.service.position.PositionBizService; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.JobState; +import org.apache.eventmesh.common.remote.TransportType; +import org.apache.eventmesh.common.remote.datasource.DataSourceType; import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.offset.RecordPosition; +import org.apache.eventmesh.common.remote.request.RecordPositionRequest; import org.apache.eventmesh.common.remote.request.ReportJobRequest; import org.apache.eventmesh.common.remote.response.SimpleResponse; import org.apache.commons.lang3.StringUtils; +import java.util.List; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import lombok.extern.slf4j.Slf4j; + @Component @Slf4j public class ReportJobRequestHandler extends BaseRequestHandler { @@ -39,6 +49,9 @@ public class ReportJobRequestHandler extends BaseRequestHandler recordPositionList = + positionBizService.getPositionByJobID(taskDetail.getIncreaseTask().getJobID(), DataSourceType.MYSQL); + if (!recordPositionList.isEmpty()) { + log.info("skip record position because of increase job has exist position.jobID:{},position list size:{}", jobInfo.getJobID(), + recordPositionList.size()); + return true; + } + + RecordPositionRequest recordPositionRequest = new RecordPositionRequest(); + recordPositionRequest.setFullJobID(taskDetail.getFullTask().getJobID()); + recordPositionRequest.setIncreaseJobID(taskDetail.getIncreaseTask().getJobID()); + recordPositionRequest.setUpdateState(request.getState()); + recordPositionRequest.setAddress(request.getAddress()); + TransportType currentTransportType = TransportType.getTransportType(jobInfo.getTransportType()); + recordPositionRequest.setDataSourceType(currentTransportType.getSrc()); + return positionBizService.recordPosition(recordPositionRequest, metadata); + } + } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportMonitorHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportMonitorHandler.java new file mode 100644 index 0000000000..a36939bb88 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportMonitorHandler.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.handler.impl; + +import org.apache.eventmesh.admin.server.AdminServerProperties; +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; +import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; +import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService; +import org.apache.eventmesh.admin.server.web.service.monitor.MonitorBizService; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.request.ReportMonitorRequest; +import org.apache.eventmesh.common.remote.response.SimpleResponse; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +import lombok.extern.slf4j.Slf4j; + +@Component +@Slf4j +public class ReportMonitorHandler extends BaseRequestHandler { + + @Autowired + private MonitorBizService monitorService; + + @Autowired + JobInfoBizService jobInfoBizService; + + @Autowired + private AdminServerProperties properties; + + @Override + protected SimpleResponse handler(ReportMonitorRequest request, Metadata metadata) { + if (StringUtils.isAnyBlank(request.getTaskID(), request.getJobID(), request.getAddress())) { + log.info("report monitor request [{}] illegal", request); + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request task id,job id or address is none"); + } + + String jobID = request.getJobID(); + EventMeshJobInfo jobInfo = jobInfoBizService.getJobInfo(jobID); + if (jobInfo == null || StringUtils.isBlank(jobInfo.getFromRegion())) { + log.info("report monitor job info [{}] illegal", request); + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "job info is null or fromRegion is blank,job id:" + jobID); + } + String fromRegion = jobInfo.getFromRegion(); + String transportType = jobInfo.getTransportType(); + if (StringUtils.isEmpty(request.getTransportType())) { + request.setTransportType(transportType); + } + String localRegion = properties.getRegion(); + log.info("report monitor request from region:{},localRegion:{},request:{}", fromRegion, localRegion, request); + if (fromRegion.equalsIgnoreCase(localRegion)) { + return monitorService.reportMonitorRecord(request) ? SimpleResponse.success() : + SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save monitor " + + "request fail"); + } else { + List adminServerList = Arrays.asList(properties.getAdminServerList().get(fromRegion).split(";")); + if (adminServerList == null || adminServerList.isEmpty()) { + throw new RuntimeException("No admin server available for region: " + fromRegion); + } + String targetUrl = adminServerList.get(new Random().nextInt(adminServerList.size())) + "/eventmesh/admin/reportMonitor"; + log.info("start transfer monitor request to from region admin server. from region:{}, targetUrl:{}", fromRegion, targetUrl); + RestTemplate restTemplate = new RestTemplate(); + ResponseEntity response = restTemplate.postForEntity(targetUrl, request, String.class); + if (!response.getStatusCode().is2xxSuccessful()) { + log.error("transfer monitor request to from region admin server error. from region:{}, targetUrl:{}", fromRegion, targetUrl); + return SimpleResponse.fail(ErrorCode.INTERNAL_ERR, + "save monitor request fail,code:" + response.getStatusCode() + ",msg:" + response.getBody()); + } + return SimpleResponse.success(); + } + } +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java index 9844f47c6a..e7f1d1257f 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java @@ -29,6 +29,7 @@ import org.apache.commons.lang3.StringUtils; +import java.util.Arrays; import java.util.List; import java.util.Random; @@ -75,7 +76,7 @@ protected SimpleResponse handler(ReportVerifyRequest request, Metadata metadata) + "request fail"); } else { log.info("start transfer report verify to from region admin server. from region:{}", fromRegion); - List adminServerList = properties.getAdminServerList().get(fromRegion); + List adminServerList = Arrays.asList(properties.getAdminServerList().get(fromRegion).split(";")); if (adminServerList == null || adminServerList.isEmpty()) { throw new RuntimeException("No admin server available for region: " + fromRegion); } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/BinlogPosition.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/BinlogPosition.java new file mode 100644 index 0000000000..5bd8daab10 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/BinlogPosition.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.pojo; + + +import lombok.Data; + +@Data +public class BinlogPosition { + private String file; + private Long position; +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/TaskDetail.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/TaskDetail.java index 86f5342f35..2b174209e2 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/TaskDetail.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/pojo/TaskDetail.java @@ -17,8 +17,18 @@ package org.apache.eventmesh.admin.server.web.pojo; +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; + +import lombok.Data; + /** * Description: */ +@Data public class TaskDetail { + + private EventMeshJobInfo fullTask; + + private EventMeshJobInfo increaseTask; + } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java index 76df629e69..c200d9801a 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java @@ -28,6 +28,7 @@ import org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; import org.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatService; import org.apache.eventmesh.admin.server.web.pojo.JobDetail; +import org.apache.eventmesh.admin.server.web.pojo.TaskDetail; import org.apache.eventmesh.admin.server.web.service.datasource.DataSourceBizService; import org.apache.eventmesh.admin.server.web.service.position.PositionBizService; import org.apache.eventmesh.common.config.connector.Config; @@ -171,7 +172,7 @@ public List createJobs(List jobs) { entityList.add(entity); } int changed = jobInfoExtService.batchSave(entityList); - if (changed != jobs.size()) { + if (changed != entityList.size()) { throw new AdminServerRuntimeException(ErrorCode.INTERNAL_ERR, String.format("create [%d] jobs of not match expect [%d]", changed, jobs.size())); } @@ -241,8 +242,14 @@ public EventMeshJobInfo getJobInfo(String jobID) { if (jobID == null) { return null; } - EventMeshJobInfo job = jobInfoService.getOne(Wrappers.query().eq("jobID", jobID)); - return job; + return jobInfoService.getOne(Wrappers.query().eq("jobID", jobID)); + } + + public List getJobsByTaskID(String taskID) { + if (taskID == null) { + return null; + } + return jobInfoService.list(Wrappers.query().eq("taskID", taskID)); } public void checkJobInfo() { @@ -253,19 +260,41 @@ public void checkJobInfo() { if (StringUtils.isEmpty(jobID)) { continue; } - EventMeshRuntimeHeartbeat heartbeat = heartbeatService.getOne(Wrappers.query().eq("jobID", jobID)); - if (heartbeat == null) { + List heartbeatList = heartbeatService.list((Wrappers.query().eq("jobID", jobID))); + if (heartbeatList == null || heartbeatList.size() == 0) { continue; } // if last heart beat update time have delay three period.print job heart beat delay warn long currentTimeStamp = System.currentTimeMillis(); - if (currentTimeStamp - heartbeat.getUpdateTime().getTime() > 3 * heatBeatPeriod) { + if (currentTimeStamp - heartbeatList.get(0).getUpdateTime().getTime() > 3 * heatBeatPeriod) { log.warn("current job heart heart has delay.jobID:{},currentTimeStamp:{},last update time:{}", jobID, currentTimeStamp, - heartbeat.getUpdateTime()); + heartbeatList.get(0).getUpdateTime()); } } } + public TaskDetail getTaskDetail(String taskID, DataSourceType dataSourceType) { + TaskDetail taskDetail = new TaskDetail(); + List jobInfoList = getJobsByTaskID(taskID); + if (jobInfoList == null || jobInfoList.size() == 0) { + return taskDetail; + } + for (EventMeshJobInfo jobInfo : jobInfoList) { + TransportType currentTransportType = TransportType.getTransportType(jobInfo.getTransportType()); + JobType jobType = JobType.fromIndex(jobInfo.getJobType()); + if (currentTransportType.getSrc().equals(dataSourceType)) { + if (jobType.name().equalsIgnoreCase(JobType.FULL.name())) { + taskDetail.setFullTask(jobInfo); + } + if (jobType.name().equalsIgnoreCase(JobType.INCREASE.name())) { + taskDetail.setIncreaseTask(jobInfo); + } + } + } + return taskDetail; + } + + } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/monitor/MonitorBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/monitor/MonitorBizService.java new file mode 100644 index 0000000000..3377334144 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/monitor/MonitorBizService.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.service.monitor; + +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshMonitor; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshMonitorService; +import org.apache.eventmesh.common.remote.request.QueryTaskMonitorRequest; +import org.apache.eventmesh.common.remote.request.ReportMonitorRequest; +import org.apache.eventmesh.common.remote.response.QueryTaskMonitorResponse; +import org.apache.eventmesh.common.remote.task.TaskMonitor; +import org.apache.eventmesh.common.utils.JsonUtils; + +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.List; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.core.metadata.OrderItem; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + +import lombok.extern.slf4j.Slf4j; + +@Service +@Slf4j +public class MonitorBizService { + + @Autowired + private EventMeshMonitorService monitorService; + + public boolean reportMonitorRecord(ReportMonitorRequest request) { + EventMeshMonitor monitor = new EventMeshMonitor(); + monitor.setTaskID(request.getTaskID()); + monitor.setJobID(request.getJobID()); + monitor.setAddress(request.getAddress()); + monitor.setTransportType(request.getTransportType()); + monitor.setConnectorStage(request.getConnectorStage()); + monitor.setTotalReqNum(request.getTotalReqNum()); + monitor.setTotalTimeCost(request.getTotalTimeCost()); + monitor.setMaxTimeCost(request.getMaxTimeCost()); + monitor.setAvgTimeCost(request.getAvgTimeCost()); + monitor.setTps(request.getTps()); + return monitorService.save(monitor); + } + + public QueryTaskMonitorResponse queryTaskMonitors(QueryTaskMonitorRequest request) { + if (StringUtils.isBlank(request.getTaskID())) { + throw new RuntimeException("task id is empty"); + } + long limit = request.getLimit(); + if (limit <= 0) { + log.info("query task monitor limit:{},use default value:{}", limit, 10); + limit = 10; + } + + Page queryPage = new Page<>(); + queryPage.setCurrent(1); + queryPage.setSize(limit); + queryPage.addOrder(OrderItem.desc("createTime")); + + QueryWrapper queryWrapper = new QueryWrapper(); + queryWrapper.eq("taskID", request.getTaskID()); + if (StringUtils.isNotEmpty(request.getJobID())) { + queryWrapper.eq("jobID", request.getJobID()); + } + List eventMeshMonitors = monitorService.list(queryPage, queryWrapper); + List taskMonitorList = new ArrayList<>(); + if (eventMeshMonitors != null) { + log.info("query event mesh monitor size:{}", eventMeshMonitors.size()); + if (log.isDebugEnabled()) { + log.debug("query event mesh monitor content:{}", JsonUtils.toJSONString(eventMeshMonitors)); + } + for (EventMeshMonitor eventMeshMonitor : eventMeshMonitors) { + TaskMonitor monitor = new TaskMonitor(); + monitor.setTaskID(eventMeshMonitor.getTaskID()); + monitor.setJobID(eventMeshMonitor.getJobID()); + monitor.setAddress(eventMeshMonitor.getAddress()); + monitor.setTransportType(eventMeshMonitor.getTransportType()); + monitor.setConnectorStage(eventMeshMonitor.getConnectorStage()); + monitor.setTotalReqNum(eventMeshMonitor.getTotalReqNum()); + monitor.setTotalTimeCost(eventMeshMonitor.getTotalTimeCost()); + monitor.setMaxTimeCost(eventMeshMonitor.getMaxTimeCost()); + monitor.setAvgTimeCost(eventMeshMonitor.getAvgTimeCost()); + monitor.setTps(eventMeshMonitor.getTps()); + monitor.setCreateTime(eventMeshMonitor.getCreateTime()); + taskMonitorList.add(monitor); + } + } + QueryTaskMonitorResponse queryTaskMonitorResponse = new QueryTaskMonitorResponse(); + queryTaskMonitorResponse.setTaskMonitors(taskMonitorList); + return queryTaskMonitorResponse; + } +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IRecordPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IRecordPositionHandler.java new file mode 100644 index 0000000000..fa38e14320 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IRecordPositionHandler.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.service.position; + +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.request.RecordPositionRequest; + +/** + * IRecordPositionHandler + */ +public interface IRecordPositionHandler { + + boolean handler(RecordPositionRequest request, Metadata metadata); + +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionBizService.java index c40fc9e7e5..0c4cd7a423 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionBizService.java @@ -23,6 +23,7 @@ import org.apache.eventmesh.common.remote.exception.ErrorCode; import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.common.remote.request.FetchPositionRequest; +import org.apache.eventmesh.common.remote.request.RecordPositionRequest; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; import java.util.List; @@ -80,4 +81,10 @@ public List getPositionByJobID(String jobID, DataSourceType type request.setJobID(jobID); return handler.handler(request, null); } + + public boolean recordPosition(RecordPositionRequest request, Metadata metadata) { + isValidatePositionRequest(request.getDataSourceType()); + IRecordPositionHandler handler = factory.getHandler(request.getDataSourceType()); + return handler.handler(request, metadata); + } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionHandler.java index e09c1a3837..9cbaf3fad6 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/PositionHandler.java @@ -19,7 +19,7 @@ import org.apache.eventmesh.common.remote.datasource.DataSourceType; -public abstract class PositionHandler implements IReportPositionHandler, IFetchPositionHandler { +public abstract class PositionHandler implements IReportPositionHandler, IFetchPositionHandler, IRecordPositionHandler { protected abstract DataSourceType getSourceType(); } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java index b8d536f388..a58fa31c07 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java @@ -23,6 +23,7 @@ import org.apache.eventmesh.common.remote.datasource.DataSourceType; import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.common.remote.request.FetchPositionRequest; +import org.apache.eventmesh.common.remote.request.RecordPositionRequest; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; import java.util.ArrayList; @@ -58,4 +59,9 @@ public List handler(FetchPositionRequest request, Metadata metad List recordPositionList = new ArrayList<>(); return recordPositionList; } + + @Override + public boolean handler(RecordPositionRequest request, Metadata metadata) { + return true; + } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java index 352ba57e96..8545078d80 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java @@ -21,18 +21,27 @@ import org.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory; import org.apache.eventmesh.admin.server.web.db.service.EventMeshMysqlPositionService; import org.apache.eventmesh.admin.server.web.db.service.EventMeshPositionReporterHistoryService; +import org.apache.eventmesh.admin.server.web.pojo.BinlogPosition; +import org.apache.eventmesh.admin.server.web.pojo.JobDetail; +import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService; import org.apache.eventmesh.admin.server.web.service.position.PositionHandler; +import org.apache.eventmesh.admin.server.web.utils.JdbcUtils; +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceConfig; +import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceFullConfig; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.remote.datasource.DataSourceType; import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.common.remote.offset.canal.CanalRecordOffset; import org.apache.eventmesh.common.remote.offset.canal.CanalRecordPartition; import org.apache.eventmesh.common.remote.request.FetchPositionRequest; +import org.apache.eventmesh.common.remote.request.RecordPositionRequest; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.commons.lang3.StringUtils; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -42,6 +51,7 @@ import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Component; +import com.alibaba.druid.pool.DruidDataSource; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.extern.slf4j.Slf4j; @@ -50,6 +60,12 @@ @Slf4j public class MysqlPositionHandler extends PositionHandler { private static final int RETRY_TIMES = 3; + private static final String SQL_SELECT_RDB_VERSION = "select version() as rdb_version"; + private static final String SQL_SHOW_BINLOG_POSITION = "SHOW MASTER STATUS"; + private static final String SQL_SELECT_SERVER_UUID_IN_MARIADB = "SELECT @@global.server_id as server_uuid"; + private static final String SQL_SHOW_SERVER_UUID_IN_MYSQL = "SELECT @@server_uuid as server_uuid"; + private static final String SQL_SELECT_GTID_IN_MARIADB = "SELECT @@global.gtid_binlog_pos as gtid"; + private static final String SQL_SELECT_GTID_IN_MYSQL = "SELECT @@gtid_executed as gtid"; private final long retryPeriod = Duration.ofMillis(500).toNanos(); @@ -59,6 +75,9 @@ public class MysqlPositionHandler extends PositionHandler { @Autowired EventMeshPositionReporterHistoryService historyService; + @Autowired + JobInfoBizService jobInfoBizService; + @Override protected DataSourceType getSourceType() { return DataSourceType.MYSQL; @@ -67,8 +86,8 @@ protected DataSourceType getSourceType() { private boolean isNotForward(EventMeshMysqlPosition now, EventMeshMysqlPosition old) { if (StringUtils.isNotBlank(old.getJournalName()) && old.getJournalName().equals(now.getJournalName()) && old.getPosition() >= now.getPosition()) { - log.info("job [{}] report position [{}] by runtime [{}] less than db position [{}] journal name [{}] by [{}]", - now.getJobID(), now.getPosition(), now.getAddress(), now.getJournalName(), old.getPosition(), old.getAddress()); + log.info("job [{}] report position [{}] by runtime [{}] less than db position [{}] journal name [{}] by [{}]", now.getJobID(), + now.getPosition(), now.getAddress(), now.getJournalName(), old.getPosition(), old.getAddress()); return true; } return false; @@ -76,8 +95,7 @@ private boolean isNotForward(EventMeshMysqlPosition now, EventMeshMysqlPosition public boolean saveOrUpdateByJob(EventMeshMysqlPosition position) { for (int i = 0; i < RETRY_TIMES; i++) { - EventMeshMysqlPosition old = positionService.getOne(Wrappers.query().eq("jobId", - position.getJobID())); + EventMeshMysqlPosition old = positionService.getOne(Wrappers.query().eq("jobId", position.getJobID())); if (old == null) { try { return positionService.save(position); @@ -95,8 +113,8 @@ public boolean saveOrUpdateByJob(EventMeshMysqlPosition position) { return true; } try { - if (!positionService.update(position, Wrappers.update().eq("updateTime", - old.getUpdateTime()).eq("jobID", old.getJobID()))) { + if (!positionService.update(position, + Wrappers.update().eq("updateTime", old.getUpdateTime()).eq("jobID", old.getJobID()))) { log.warn("update position [{}] fail, maybe current update. it will retry in 500ms", position); LockSupport.parkNanos(retryPeriod); continue; @@ -123,7 +141,6 @@ public boolean saveOrUpdateByJob(EventMeshMysqlPosition position) { @Override public boolean handler(ReportPositionRequest request, Metadata metadata) { - try { List recordPositionList = request.getRecordPositionList(); RecordPosition recordPosition = recordPositionList.get(0); @@ -170,8 +187,7 @@ public boolean handler(ReportPositionRequest request, Metadata metadata) { @Override public List handler(FetchPositionRequest request, Metadata metadata) { - List positionList = positionService.list(Wrappers.query().eq("jobID", - request.getJobID())); + List positionList = positionService.list(Wrappers.query().eq("jobID", request.getJobID())); List recordPositionList = new ArrayList<>(); for (EventMeshMysqlPosition position : positionList) { CanalRecordPartition partition = new CanalRecordPartition(); @@ -189,4 +205,147 @@ public List handler(FetchPositionRequest request, Metadata metad } return recordPositionList; } + + @Override + public boolean handler(RecordPositionRequest request, Metadata metadata) { + try { + String fullJobID = request.getFullJobID(); + String increaseJobID = request.getIncreaseJobID(); + log.info("start record full job position to increase job position,full jobID:{}, increase jobID:{}.", fullJobID, increaseJobID); + JobDetail fullJobDetail = jobInfoBizService.getJobDetail(fullJobID); + CanalSourceConfig canalSourceConfig = (CanalSourceConfig) fullJobDetail.getSourceDataSource().getConf(); + CanalSourceFullConfig canalSourceFullConfig = JsonUtils.mapToObject(canalSourceConfig.getSourceConfig(), CanalSourceFullConfig.class); + try (DruidDataSource druidDataSource = JdbcUtils.createDruidDataSource(canalSourceFullConfig.getSourceConnectorConfig().getUrl(), + canalSourceFullConfig.getSourceConnectorConfig().getUserName(), canalSourceFullConfig.getSourceConnectorConfig().getPassWord())) { + + DataSourceType dataSourceType = checkRDBDataSourceType(druidDataSource); + + ReportPositionRequest reportPositionRequest = new ReportPositionRequest(); + reportPositionRequest.setJobID(increaseJobID); + reportPositionRequest.setDataSourceType(DataSourceType.MYSQL); + reportPositionRequest.setAddress(request.getAddress()); + + RecordPosition recordPosition = new RecordPosition(); + CanalRecordOffset recordOffset = new CanalRecordOffset(); + BinlogPosition binlogPosition = queryBinlogPosition(druidDataSource); + String gtid = queryGTID(druidDataSource, dataSourceType); + recordOffset.setOffset(binlogPosition.getPosition()); + recordOffset.setGtid(gtid); + recordPosition.setRecordOffset(recordOffset); + + CanalRecordPartition recordPartition = new CanalRecordPartition(); + String serverUUID = queryServerUUID(druidDataSource, dataSourceType); + recordPartition.setJournalName(binlogPosition.getFile()); + recordPartition.setServerUUID(serverUUID); + recordPosition.setRecordPartition(recordPartition); + + List recordPositions = new ArrayList<>(); + recordPositions.add(recordPosition); + + reportPositionRequest.setRecordPositionList(recordPositions); + log.info("start store increase task position,jobID:{},request:{}", increaseJobID, reportPositionRequest); + handler(reportPositionRequest, metadata); + } + return true; + } catch (Exception e) { + log.error("record full job position to increase job position failed.", e); + return false; + } + } + + private DataSourceType checkRDBDataSourceType(DruidDataSource druidDataSource) { + try { + log.info("execute sql '{}' start.", SQL_SELECT_RDB_VERSION); + try (PreparedStatement preparedStatement = druidDataSource.getConnection().prepareStatement(SQL_SELECT_RDB_VERSION)) { + ResultSet resultSet = preparedStatement.executeQuery(); + if (resultSet.next()) { + log.info("execute sql '{}' result:{}", SQL_SELECT_RDB_VERSION, resultSet); + String rdbVersion = resultSet.getString("rdb_version"); + if (StringUtils.isNotBlank(rdbVersion)) { + if (rdbVersion.toLowerCase().contains(DataSourceType.MariaDB.getName().toLowerCase())) { + return DataSourceType.MariaDB; + } + } + } + } + } catch (Exception e) { + log.warn("select rdb version failed,data source:{}", druidDataSource, e); + throw new RuntimeException("select rdb version failed"); + } + return DataSourceType.MYSQL; + } + + private BinlogPosition queryBinlogPosition(DruidDataSource druidDataSource) { + BinlogPosition binlogPosition = new BinlogPosition(); + try { + log.info("execute sql '{}' start.", SQL_SHOW_BINLOG_POSITION); + try (PreparedStatement preparedStatement = druidDataSource.getConnection().prepareStatement(SQL_SHOW_BINLOG_POSITION)) { + ResultSet resultSet = preparedStatement.executeQuery(); + if (resultSet.next()) { + log.info("execute sql '{}' result:{}", SQL_SELECT_RDB_VERSION, resultSet); + String fileName = resultSet.getString("File"); + Long position = resultSet.getLong("Position"); + binlogPosition.setFile(fileName); + binlogPosition.setPosition(position); + } + } + } catch (Exception e) { + log.warn("show binlog position failed,data source:{}", druidDataSource, e); + throw new RuntimeException("show binlog position failed"); + } + return binlogPosition; + } + + private String queryServerUUID(DruidDataSource druidDataSource, DataSourceType dataSourceType) { + String serverUUID = ""; + try { + String queryServerUUIDSql; + if (DataSourceType.MariaDB.equals(dataSourceType)) { + queryServerUUIDSql = SQL_SELECT_SERVER_UUID_IN_MARIADB; + } else { + queryServerUUIDSql = SQL_SHOW_SERVER_UUID_IN_MYSQL; + } + log.info("execute sql '{}' start.", queryServerUUIDSql); + try (PreparedStatement preparedStatement = druidDataSource.getConnection().prepareStatement(queryServerUUIDSql)) { + ResultSet resultSet = preparedStatement.executeQuery(); + if (resultSet.next()) { + log.info("execute sql '{}' result:{}", queryServerUUIDSql, resultSet); + serverUUID = resultSet.getString("server_uuid"); + log.info("execute sql '{}',query server_uuid result:{}", queryServerUUIDSql, serverUUID); + return serverUUID; + } + } + } catch (Exception e) { + log.warn("select server_uuid failed,data source:{}", druidDataSource, e); + throw new RuntimeException("select server_uuid failed"); + } + return serverUUID; + } + + private String queryGTID(DruidDataSource druidDataSource, DataSourceType dataSourceType) { + String gitd = ""; + try { + String queryGTIDSql; + if (DataSourceType.MariaDB.equals(dataSourceType)) { + queryGTIDSql = SQL_SELECT_GTID_IN_MARIADB; + } else { + queryGTIDSql = SQL_SELECT_GTID_IN_MYSQL; + } + log.info("execute sql '{}' start.", queryGTIDSql); + try (PreparedStatement preparedStatement = druidDataSource.getConnection().prepareStatement(queryGTIDSql)) { + ResultSet resultSet = preparedStatement.executeQuery(); + if (resultSet.next()) { + log.info("execute sql '{}' result:{}", queryGTIDSql, resultSet); + gitd = resultSet.getString("gtid"); + log.info("execute sql '{}',select gitd result:{}", queryGTIDSql, gitd); + return gitd; + } + } + } catch (Exception e) { + log.warn("select gtid failed,data source:{}", druidDataSource, e); + // when db server not open gitd mode, ignore gtid query exception + //throw new RuntimeException("select gtid failed"); + } + return gitd; + } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java index 7bc16ba4ac..d3c7087d47 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java @@ -18,25 +18,33 @@ package org.apache.eventmesh.admin.server.web.service.task; import org.apache.eventmesh.admin.server.AdminServerProperties; -import org.apache.eventmesh.admin.server.web.Response; import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; import org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo; import org.apache.eventmesh.admin.server.web.db.service.EventMeshTaskInfoService; import org.apache.eventmesh.admin.server.web.pojo.JobDetail; import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService; import org.apache.eventmesh.common.config.connector.Config; +import org.apache.eventmesh.common.exception.EventMeshException; +import org.apache.eventmesh.common.remote.JobState; import org.apache.eventmesh.common.remote.TaskState; import org.apache.eventmesh.common.remote.datasource.DataSource; import org.apache.eventmesh.common.remote.datasource.DataSourceType; import org.apache.eventmesh.common.remote.request.CreateTaskRequest; +import org.apache.eventmesh.common.remote.request.QueryTaskInfoRequest; +import org.apache.eventmesh.common.remote.request.TaskBachRequest; +import org.apache.eventmesh.common.remote.request.TaskIDRequest; import org.apache.eventmesh.common.remote.response.CreateTaskResponse; +import org.apache.eventmesh.common.remote.response.QueryTaskInfoResponse; +import org.apache.eventmesh.common.remote.response.SimpleResponse; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Random; import java.util.UUID; import java.util.stream.Collectors; @@ -47,6 +55,11 @@ import org.springframework.transaction.annotation.Transactional; import org.springframework.web.client.RestTemplate; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j @Service public class TaskBizService { @@ -81,7 +94,7 @@ public CreateTaskResponse createTask(CreateTaskRequest req) { String remoteResponse = ""; // not from other admin && target not equals with self region if (!req.isFlag() && !properties.getRegion().equals(targetRegion)) { - List adminServerList = properties.getAdminServerList().get(targetRegion); + List adminServerList = Arrays.asList(properties.getAdminServerList().get(targetRegion).split(";")); if (adminServerList == null || adminServerList.isEmpty()) { throw new RuntimeException("No admin server available for region: " + targetRegion); } @@ -165,11 +178,183 @@ private CreateTaskResponse buildCreateTaskResponse(String taskId, Listquery() + .eq("taskID", taskIDRequest.getTaskID())); + + if (Objects.isNull(taskInfoServiceOne)) { + throw new EventMeshException("task not found"); + } + + if (TaskState.DELETE.name().equals(taskInfoServiceOne.getTaskState())) { + throw new EventMeshException("task already deleted"); + } + + // update task state + taskInfoService.update(Wrappers.update() + .eq("id", taskInfoServiceOne.getId()) + .set("taskState", TaskState.RUNNING.name())); + + List eventMeshJobInfos = jobInfoService.getJobsByTaskID(taskIDRequest.getTaskID()); + + for (EventMeshJobInfo eventMeshJobInfo : eventMeshJobInfos) { + // update job state by jonID + jobInfoService.updateJobState(eventMeshJobInfo.getJobID(), JobState.RUNNING); + } + + // todo: start task job eventmesh-runtime-v2 schedule ? + + } catch (Exception e) { + log.info("start task exception:{}", e.getMessage()); + throw new EventMeshException("start task exception"); + } + } + + @Transactional + public boolean deleteTaskByTaskID(TaskIDRequest taskIDRequest) { + try { + EventMeshTaskInfo taskInfoServiceOne = taskInfoService.getOne(Wrappers.query() + .eq("taskID", taskIDRequest.getTaskID())); + + if (Objects.isNull(taskInfoServiceOne)) { + throw new EventMeshException("task not found"); + } + + if (!TaskState.DELETE.name().equals(taskInfoServiceOne.getTaskState())) { + // update task state to delete + taskInfoService.update(Wrappers.update() + .eq("id", taskInfoServiceOne.getId()) + .set("taskState", TaskState.DELETE.name())); + } + List eventMeshJobInfos = jobInfoService.getJobsByTaskID(taskInfoServiceOne.getTaskID()); + for (EventMeshJobInfo eventMeshJobInfo : eventMeshJobInfos) { + // update job state to delete + jobInfoService.updateJobState(eventMeshJobInfo.getJobID(), JobState.DELETE); + } + // todo: data source config need delete? + + } catch (RuntimeException e) { + log.error("delete task failed:{}", e.getMessage()); + throw new EventMeshException("delete task failed"); + } + return true; + } + + public List queryTaskInfo(QueryTaskInfoRequest taskInfoRequest) { + return taskInfoService.queryTaskInfo(taskInfoRequest); + } + + @Transactional + public void restartTask(TaskIDRequest taskIDRequest) { + try { + EventMeshTaskInfo taskInfoServiceOne = taskInfoService.getOne(Wrappers.query() + .eq("taskID", taskIDRequest.getTaskID()) + .ne("taskState", TaskState.DELETE.name())); + + if (Objects.isNull(taskInfoServiceOne)) { + throw new EventMeshException("task not found"); + } + if (!TaskState.RUNNING.name().equals(taskInfoServiceOne.getTaskState())) { + taskInfoService.update(Wrappers.update() + .eq("id", taskInfoServiceOne.getId()) + .set("taskState", TaskState.RUNNING.name())); + } + List eventMeshJobInfos = jobInfoService.getJobsByTaskID(taskInfoServiceOne.getTaskID()); + for (EventMeshJobInfo eventMeshJobInfo : eventMeshJobInfos) { + // update job state to restart + jobInfoService.updateJobState(eventMeshJobInfo.getJobID(), JobState.RUNNING); + } + // todo: start task job eventmesh-runtime-v2 schedule? + + } catch (RuntimeException e) { + log.error("restart task filed:{}", e.getMessage()); + throw new EventMeshException("restart task filed"); + } + } + + @Transactional + public void stopTask(TaskIDRequest taskIDRequest) { + try { + EventMeshTaskInfo taskInfoServiceOne = taskInfoService.getOne(Wrappers.query() + .eq("taskID", taskIDRequest.getTaskID())); + + if (Objects.isNull(taskInfoServiceOne)) { + throw new EventMeshException("task not found"); + } + if (!TaskState.PAUSE.name().equals(taskInfoServiceOne.getTaskState())) { + taskInfoService.update(Wrappers.update() + .eq("id", taskInfoServiceOne.getId()) + .set("taskState", TaskState.PAUSE.name())); + } + + List eventMeshJobInfos = jobInfoService.getJobsByTaskID(taskInfoServiceOne.getTaskID()); + for (EventMeshJobInfo eventMeshJobInfo : eventMeshJobInfos) { + // update job state to pause + jobInfoService.updateJobState(eventMeshJobInfo.getJobID(), JobState.PAUSE); + } + + // todo: stop task job eventmesh-runtime-v2 schedule? + + } catch (RuntimeException e) { + log.error("stop task filed:{}", e.getMessage()); + throw new EventMeshException("stop task filed"); + } + } + + @Transactional + public void restartBatchTask(List taskIDRequestList, List errorNames) { + for (TaskBachRequest task : taskIDRequestList) { + try { + TaskIDRequest taskIDRequest = new TaskIDRequest(); + taskIDRequest.setTaskID(task.getTaskID()); + startTask(taskIDRequest); + } catch (RuntimeException e) { + log.error("restart batch task failed:{}", e.getMessage()); + errorNames.add(task.getTaskName()); + } + } + } + + @Transactional + public void stopBatchTask(List taskIDRequestList, List errorNames) { + for (TaskBachRequest task : taskIDRequestList) { + try { + TaskIDRequest taskIDRequest = new TaskIDRequest(); + taskIDRequest.setTaskID(task.getTaskID()); + stopTask(taskIDRequest); + } catch (RuntimeException e) { + log.error("stop batch task failed:{}", e.getMessage()); + errorNames.add(task.getTaskName()); + } + } + } + + @Transactional + public void startBatchTask(List taskIDRequestList, List errorNames) { + for (TaskBachRequest task : taskIDRequestList) { + try { + TaskIDRequest taskIDRequest = new TaskIDRequest(); + taskIDRequest.setTaskID(task.getTaskID()); + restartTask(taskIDRequest); + } catch (RuntimeException e) { + log.error("start batch task failed:{}", e.getMessage()); + errorNames.add(task.getTaskName()); + } + } + } + } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/utils/Base64.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/utils/Base64.java new file mode 100644 index 0000000000..f85807b7f9 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/utils/Base64.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.utils; + +public class Base64 { + private static char[] alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=".toCharArray(); + private static byte[] codes = new byte[256]; + + public Base64() { + } + + public static char[] encode(byte[] data) { + char[] out = new char[(data.length + 2) / 3 * 4]; + int i = 0; + + for (int index = 0; i < data.length; index += 4) { + boolean quad = false; + boolean trip = false; + int val = 255 & data[i]; + val <<= 8; + if (i + 1 < data.length) { + val |= 255 & data[i + 1]; + trip = true; + } + + val <<= 8; + if (i + 2 < data.length) { + val |= 255 & data[i + 2]; + quad = true; + } + + out[index + 3] = alphabet[quad ? val & 63 : 64]; + val >>= 6; + out[index + 2] = alphabet[trip ? val & 63 : 64]; + val >>= 6; + out[index + 1] = alphabet[val & 63]; + val >>= 6; + out[index + 0] = alphabet[val & 63]; + i += 3; + } + + return out; + } + + public static byte[] decode(char[] data) { + int tempLen = data.length; + + int len; + for (len = 0; len < data.length; ++len) { + if (data[len] > 255 || codes[data[len]] < 0) { + --tempLen; + } + } + + len = tempLen / 4 * 3; + if (tempLen % 4 == 3) { + len += 2; + } + + if (tempLen % 4 == 2) { + ++len; + } + + byte[] out = new byte[len]; + int shift = 0; + int accum = 0; + int index = 0; + + for (int ix = 0; ix < data.length; ++ix) { + int value = data[ix] > 255 ? -1 : codes[data[ix]]; + if (value >= 0) { + accum <<= 6; + shift += 6; + accum |= value; + if (shift >= 8) { + shift -= 8; + out[index++] = (byte) (accum >> shift & 255); + } + } + } + + if (index != out.length) { + throw new Error("Miscalculated data length (wrote " + index + " instead of " + out.length + ")"); + } else { + return out; + } + } + + static { + int i; + for (i = 0; i < 256; ++i) { + codes[i] = -1; + } + + for (i = 65; i <= 90; ++i) { + codes[i] = (byte) (i - 65); + } + + for (i = 97; i <= 122; ++i) { + codes[i] = (byte) (26 + i - 97); + } + + for (i = 48; i <= 57; ++i) { + codes[i] = (byte) (52 + i - 48); + } + + codes[43] = 62; + codes[47] = 63; + } +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/utils/Base64Utils.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/utils/Base64Utils.java new file mode 100644 index 0000000000..9c9a258671 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/utils/Base64Utils.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.utils; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.OutputStream; + +public class Base64Utils { + private static final int CACHE_SIZE = 1024; + + public Base64Utils() { + } + + public static byte[] decode(String base64) throws Exception { + return Base64.decode(base64.toCharArray()); + } + + public static String encode(byte[] bytes) throws Exception { + return new String(Base64.encode(bytes)); + } + + public static String encodeFile(String filePath) throws Exception { + byte[] bytes = fileToByte(filePath); + return encode(bytes); + } + + public static void decodeToFile(String filePath, String base64) throws Exception { + byte[] bytes = decode(base64); + byteArrayToFile(bytes, filePath); + } + + public static byte[] fileToByte(String filePath) throws Exception { + byte[] data = new byte[0]; + File file = new File(filePath); + if (file.exists()) { + FileInputStream in = new FileInputStream(file); + ByteArrayOutputStream out = new ByteArrayOutputStream(2048); + byte[] cache = new byte[1024]; + int nread; + + while ((nread = in.read(cache)) != -1) { + out.write(cache, 0, nread); + out.flush(); + } + + out.close(); + in.close(); + data = out.toByteArray(); + } + + return data; + } + + public static void byteArrayToFile(byte[] bytes, String filePath) throws Exception { + InputStream in = new ByteArrayInputStream(bytes); + File destFile = new File(filePath); + if (!destFile.getParentFile().exists()) { + destFile.getParentFile().mkdirs(); + } + + destFile.createNewFile(); + OutputStream out = new FileOutputStream(destFile); + byte[] cache = new byte[1024]; + + int nread; + while ((nread = ((InputStream) in).read(cache)) != -1) { + ((OutputStream) out).write(cache, 0, nread); + ((OutputStream) out).flush(); + } + + ((OutputStream) out).close(); + ((InputStream) in).close(); + } +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/utils/EncryptUtil.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/utils/EncryptUtil.java new file mode 100644 index 0000000000..06c8bbc330 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/utils/EncryptUtil.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.utils; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; + +public class EncryptUtil { + public EncryptUtil() { + } + + private static byte[] hexStringToBytes(String hexString) { + if (hexString != null && !hexString.equals("")) { + hexString = hexString.toUpperCase(); + int length = hexString.length() / 2; + char[] hexChars = hexString.toCharArray(); + byte[] d = new byte[length]; + + for (int i = 0; i < length; ++i) { + int pos = i * 2; + d[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1])); + } + + return d; + } else { + return null; + } + } + + public static String byteToHexString(byte[] b) { + String a = ""; + + for (int i = 0; i < b.length; ++i) { + String hex = Integer.toHexString(b[i] & 255); + if (hex.length() == 1) { + hex = '0' + hex; + } + + a = a + hex; + } + + return a; + } + + private static byte charToByte(char c) { + return (byte) "0123456789ABCDEF".indexOf(c); + } + + private static String readFileContent(String filePath) { + File file = new File(filePath); + BufferedReader reader = null; + StringBuffer key = new StringBuffer(); + + try { + IOException e; + try { + reader = new BufferedReader(new FileReader(file)); + e = null; + + String tempString; + while ((tempString = reader.readLine()) != null) { + if (!tempString.startsWith("--")) { + key.append(tempString); + } + } + + reader.close(); + } catch (IOException ioException) { + e = ioException; + e.printStackTrace(); + } + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException ioException) { + ioException.printStackTrace(); + } + } + + } + + return key.toString(); + } + + public static String decrypt(String sysPubKeyFile, String appPrivKeyFile, String encStr) throws Exception { + String pubKeyBase64 = readFileContent(sysPubKeyFile); + String privKeyBase64 = readFileContent(appPrivKeyFile); + byte[] encBin = hexStringToBytes(encStr); + byte[] pubDecBin = RSAUtils.decryptByPublicKeyBlock(encBin, pubKeyBase64); + byte[] privDecBin = RSAUtils.decryptByPrivateKeyBlock(pubDecBin, privKeyBase64); + return new String(privDecBin); + } + + public static String decrypt(ParamType pubKeyType, String sysPubKey, ParamType privKeyType, String appPrivKey, ParamType passwdType, + String passwd) throws Exception { + String pubKeyBase64 = pubKeyType == ParamType.FILE ? readFileContent(sysPubKey) : sysPubKey; + String privKeyBase64 = privKeyType == ParamType.FILE ? readFileContent(appPrivKey) : appPrivKey; + String passwdContent = passwdType == ParamType.FILE ? readFileContent(passwd) : passwd; + byte[] encBin = hexStringToBytes(passwdContent); + byte[] pubDecBin = RSAUtils.decryptByPublicKeyBlock(encBin, pubKeyBase64); + byte[] privDecBin = RSAUtils.decryptByPrivateKeyBlock(pubDecBin, privKeyBase64); + return new String(privDecBin); + } + + public static String encrypt(String appPubKeyFile, String sysPrivKeyFile, String passwd) throws Exception { + String pubKeyBase64 = readFileContent(appPubKeyFile); + String privKeyBase64 = readFileContent(sysPrivKeyFile); + byte[] pubEncBin = RSAUtils.encryptByPublicKeyBlock(passwd.getBytes(), pubKeyBase64); + byte[] privEncBin = RSAUtils.encryptByPrivateKeyBlock(pubEncBin, privKeyBase64); + return byteToHexString(privEncBin); + } + + public static String encrypt(ParamType pubKeyType, String appPubKey, ParamType privKeyType, String sysPrivKey, String passwd) throws Exception { + String pubKeyBase64 = pubKeyType == ParamType.FILE ? readFileContent(appPubKey) : appPubKey; + String privKeyBase64 = privKeyType == ParamType.FILE ? readFileContent(sysPrivKey) : sysPrivKey; + byte[] pubEncBin = RSAUtils.encryptByPublicKeyBlock(passwd.getBytes(), pubKeyBase64); + byte[] privEncBin = RSAUtils.encryptByPrivateKeyBlock(pubEncBin, privKeyBase64); + return byteToHexString(privEncBin); + } +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/utils/JdbcUtils.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/utils/JdbcUtils.java new file mode 100644 index 0000000000..c012806e2e --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/utils/JdbcUtils.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.utils; + +import com.alibaba.druid.pool.DruidDataSource; + +public class JdbcUtils { + + public static DruidDataSource createDruidDataSource(String url, String userName, String passWord) { + DruidDataSource dataSource = new DruidDataSource(); + dataSource.setUrl(url); + dataSource.setUsername(userName); + dataSource.setPassword(passWord); + dataSource.setInitialSize(5); + dataSource.setMinIdle(5); + dataSource.setMaxActive(20); + dataSource.setMaxWait(60000); + dataSource.setTimeBetweenEvictionRunsMillis(60000); + dataSource.setMinEvictableIdleTimeMillis(300000); + dataSource.setValidationQuery("SELECT 1"); + dataSource.setTestWhileIdle(true); + dataSource.setTestOnBorrow(false); + dataSource.setTestOnReturn(false); + dataSource.setPoolPreparedStatements(true); + dataSource.setMaxPoolPreparedStatementPerConnectionSize(20); + return dataSource; + } +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/utils/ParamType.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/utils/ParamType.java new file mode 100644 index 0000000000..ed58a49b89 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/utils/ParamType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.utils; + +public enum ParamType { + FILE, + STRING; + + private ParamType() { + } +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/utils/RSAUtils.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/utils/RSAUtils.java new file mode 100644 index 0000000000..9353eb3f17 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/utils/RSAUtils.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.utils; + +import java.io.ByteArrayOutputStream; +import java.security.Key; +import java.security.KeyFactory; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.PrivateKey; +import java.security.PublicKey; +import java.security.Signature; +import java.security.interfaces.RSAPrivateKey; +import java.security.interfaces.RSAPublicKey; +import java.security.spec.PKCS8EncodedKeySpec; +import java.security.spec.X509EncodedKeySpec; +import java.util.HashMap; +import java.util.Map; + +import javax.crypto.Cipher; + +public class RSAUtils { + public static final String KEY_ALGORITHM = "RSA"; + public static final String SIGNATURE_ALGORITHM = "MD5withRSA"; + private static final String PUBLIC_KEY = "RSAPublicKey"; + private static final String PRIVATE_KEY = "RSAPrivateKey"; + private static final int MAX_ENCRYPT_BLOCK = 117; + private static final int MAX_DECRYPT_BLOCK = 128; + + public RSAUtils() { + } + + public static Map genKeyPair() throws Exception { + KeyPairGenerator keyPairGen = KeyPairGenerator.getInstance("RSA"); + keyPairGen.initialize(1024); + KeyPair keyPair = keyPairGen.generateKeyPair(); + RSAPublicKey publicKey = (RSAPublicKey) keyPair.getPublic(); + RSAPrivateKey privateKey = (RSAPrivateKey) keyPair.getPrivate(); + Map keyMap = new HashMap(2); + keyMap.put("RSAPublicKey", publicKey); + keyMap.put("RSAPrivateKey", privateKey); + return keyMap; + } + + public static String sign(byte[] data, String privateKey) throws Exception { + byte[] keyBytes = Base64Utils.decode(privateKey); + PKCS8EncodedKeySpec pkcs8KeySpec = new PKCS8EncodedKeySpec(keyBytes); + KeyFactory keyFactory = KeyFactory.getInstance("RSA"); + PrivateKey privateK = keyFactory.generatePrivate(pkcs8KeySpec); + Signature signature = Signature.getInstance("MD5withRSA"); + signature.initSign(privateK); + signature.update(data); + return Base64Utils.encode(signature.sign()); + } + + public static boolean verify(byte[] data, String publicKey, String sign) throws Exception { + byte[] keyBytes = Base64Utils.decode(publicKey); + X509EncodedKeySpec keySpec = new X509EncodedKeySpec(keyBytes); + KeyFactory keyFactory = KeyFactory.getInstance("RSA"); + PublicKey publicK = keyFactory.generatePublic(keySpec); + Signature signature = Signature.getInstance("MD5withRSA"); + signature.initVerify(publicK); + signature.update(data); + return signature.verify(Base64Utils.decode(sign)); + } + + public static byte[] decryptByPrivateKey(byte[] encryptedData, String privateKey) throws Exception { + byte[] keyBytes = Base64Utils.decode(privateKey); + PKCS8EncodedKeySpec pkcs8KeySpec = new PKCS8EncodedKeySpec(keyBytes); + KeyFactory keyFactory = KeyFactory.getInstance("RSA"); + Key privateK = keyFactory.generatePrivate(pkcs8KeySpec); + Cipher cipher = Cipher.getInstance(keyFactory.getAlgorithm()); + cipher.init(2, privateK); + int inputLen = encryptedData.length; + ByteArrayOutputStream out = new ByteArrayOutputStream(); + int offSet = 0; + + for (int i = 0; inputLen - offSet > 0; offSet = i * 128) { + byte[] cache; + if (inputLen - offSet > 128) { + cache = cipher.doFinal(encryptedData, offSet, 128); + } else { + cache = cipher.doFinal(encryptedData, offSet, inputLen - offSet); + } + + out.write(cache, 0, cache.length); + ++i; + } + + byte[] decryptedData = out.toByteArray(); + out.close(); + return decryptedData; + } + + public static byte[] decryptByPrivateKeyBlock(byte[] encryptedData, String privateKey) throws Exception { + byte[] keyBytes = Base64Utils.decode(privateKey); + PKCS8EncodedKeySpec pkcs8KeySpec = new PKCS8EncodedKeySpec(keyBytes); + KeyFactory keyFactory = KeyFactory.getInstance("RSA"); + Key privateK = keyFactory.generatePrivate(pkcs8KeySpec); + Cipher cipher = Cipher.getInstance(keyFactory.getAlgorithm()); + cipher.init(2, privateK); + int inputLen = encryptedData.length; + int offSet = 0; + byte[] cache = cipher.doFinal(encryptedData, offSet, inputLen); + return cache; + } + + public static byte[] decryptByPublicKey(byte[] encryptedData, String publicKey) throws Exception { + byte[] keyBytes = Base64Utils.decode(publicKey); + X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(keyBytes); + KeyFactory keyFactory = KeyFactory.getInstance("RSA"); + Key publicK = keyFactory.generatePublic(x509KeySpec); + Cipher cipher = Cipher.getInstance(keyFactory.getAlgorithm()); + cipher.init(2, publicK); + int inputLen = encryptedData.length; + ByteArrayOutputStream out = new ByteArrayOutputStream(); + int offSet = 0; + + for (int i = 0; inputLen - offSet > 0; offSet = i * 128) { + byte[] cache; + if (inputLen - offSet > 128) { + cache = cipher.doFinal(encryptedData, offSet, 128); + } else { + cache = cipher.doFinal(encryptedData, offSet, inputLen - offSet); + } + + out.write(cache, 0, cache.length); + ++i; + } + + byte[] decryptedData = out.toByteArray(); + out.close(); + return decryptedData; + } + + public static byte[] decryptByPublicKeyBlock(byte[] encryptedData, String publicKey) throws Exception { + byte[] keyBytes = Base64Utils.decode(publicKey); + X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(keyBytes); + KeyFactory keyFactory = KeyFactory.getInstance("RSA"); + Key publicK = keyFactory.generatePublic(x509KeySpec); + Cipher cipher = Cipher.getInstance(keyFactory.getAlgorithm()); + cipher.init(2, publicK); + int inputLen = encryptedData.length; + int offSet = 0; + byte[] cache = cipher.doFinal(encryptedData, offSet, inputLen); + return cache; + } + + public static byte[] encryptByPublicKey(byte[] data, String publicKey) throws Exception { + byte[] keyBytes = Base64Utils.decode(publicKey); + X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(keyBytes); + KeyFactory keyFactory = KeyFactory.getInstance("RSA"); + Key publicK = keyFactory.generatePublic(x509KeySpec); + Cipher cipher = Cipher.getInstance(keyFactory.getAlgorithm()); + cipher.init(1, publicK); + int inputLen = data.length; + ByteArrayOutputStream out = new ByteArrayOutputStream(); + int offSet = 0; + + for (int i = 0; inputLen - offSet > 0; offSet = i * 117) { + byte[] cache; + if (inputLen - offSet > 117) { + cache = cipher.doFinal(data, offSet, 117); + } else { + cache = cipher.doFinal(data, offSet, inputLen - offSet); + } + + out.write(cache, 0, cache.length); + ++i; + } + + byte[] encryptedData = out.toByteArray(); + out.close(); + return encryptedData; + } + + public static byte[] encryptByPublicKeyBlock(byte[] data, String publicKey) throws Exception { + byte[] keyBytes = Base64Utils.decode(publicKey); + X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(keyBytes); + KeyFactory keyFactory = KeyFactory.getInstance("RSA"); + Key publicK = keyFactory.generatePublic(x509KeySpec); + Cipher cipher = Cipher.getInstance(keyFactory.getAlgorithm()); + cipher.init(1, publicK); + int inputLen = data.length; + int offSet = 0; + byte[] cache = cipher.doFinal(data, offSet, inputLen); + return cache; + } + + public static byte[] encryptByPrivateKey(byte[] data, String privateKey) throws Exception { + byte[] keyBytes = Base64Utils.decode(privateKey); + PKCS8EncodedKeySpec pkcs8KeySpec = new PKCS8EncodedKeySpec(keyBytes); + KeyFactory keyFactory = KeyFactory.getInstance("RSA"); + Key privateK = keyFactory.generatePrivate(pkcs8KeySpec); + Cipher cipher = Cipher.getInstance(keyFactory.getAlgorithm()); + cipher.init(1, privateK); + int inputLen = data.length; + ByteArrayOutputStream out = new ByteArrayOutputStream(); + int offSet = 0; + + for (int i = 0; inputLen - offSet > 0; offSet = i * 117) { + byte[] cache; + if (inputLen - offSet > 117) { + cache = cipher.doFinal(data, offSet, 117); + } else { + cache = cipher.doFinal(data, offSet, inputLen - offSet); + } + + out.write(cache, 0, cache.length); + ++i; + } + + byte[] encryptedData = out.toByteArray(); + out.close(); + return encryptedData; + } + + public static byte[] encryptByPrivateKeyBlock(byte[] data, String privateKey) throws Exception { + byte[] keyBytes = Base64Utils.decode(privateKey); + PKCS8EncodedKeySpec pkcs8KeySpec = new PKCS8EncodedKeySpec(keyBytes); + KeyFactory keyFactory = KeyFactory.getInstance("RSA"); + Key privateK = keyFactory.generatePrivate(pkcs8KeySpec); + Cipher cipher = Cipher.getInstance(keyFactory.getAlgorithm()); + cipher.init(1, privateK); + int inputLen = data.length; + int offSet = 0; + byte[] cache = cipher.doFinal(data, offSet, inputLen); + return cache; + } + + public static String getPrivateKey(Map keyMap) throws Exception { + Key key = (Key) keyMap.get("RSAPrivateKey"); + return Base64Utils.encode(key.getEncoded()); + } + + public static String getPublicKey(Map keyMap) throws Exception { + Key key = (Key) keyMap.get("RSAPublicKey"); + return Base64Utils.encode(key.getEncoded()); + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java index da9daffe9c..150a67e302 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java @@ -24,7 +24,7 @@ @ToString public enum JobState { - INIT, RUNNING, COMPLETE, DELETE, FAIL; + INIT, RUNNING, COMPLETE, DELETE, FAIL, PAUSE; private static final JobState[] STATES_NUM_INDEX = JobState.values(); private static final Map STATES_NAME_INDEX = new HashMap<>(); diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/QueryTaskInfoRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/QueryTaskInfoRequest.java new file mode 100644 index 0000000000..c0973cf63d --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/QueryTaskInfoRequest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.request; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class QueryTaskInfoRequest { + + private String taskDesc; + + private String taskID; + + private String jobType; + + private String sourceDataID; + + private String targetDataID; + + private String ip; + + private String sourceTableName; + + private String taskMathID; + + private Integer currentPage; + + private Integer pageSize; + +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/QueryTaskMonitorRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/QueryTaskMonitorRequest.java new file mode 100644 index 0000000000..cd777d5019 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/QueryTaskMonitorRequest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.request; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@Data +@EqualsAndHashCode(callSuper = true) +@ToString +public class QueryTaskMonitorRequest extends BaseRemoteRequest { + private String taskID; + private String jobID; + private long limit; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/RecordPositionRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/RecordPositionRequest.java new file mode 100644 index 0000000000..b04a6f1041 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/RecordPositionRequest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.request; + +import org.apache.eventmesh.common.remote.JobState; +import org.apache.eventmesh.common.remote.datasource.DataSourceType; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@Data +@EqualsAndHashCode(callSuper = true) +@ToString +public class RecordPositionRequest extends BaseRemoteRequest { + + private String fullJobID; + + private String increaseJobID; + // prepare to update job state to current state + private JobState updateState; + + private String address; + + private DataSourceType dataSourceType; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/TaskBachRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/TaskBachRequest.java new file mode 100644 index 0000000000..306badc156 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/TaskBachRequest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.request; + +import lombok.Data; + +@Data +public class TaskBachRequest { + + private String taskID; + + private String taskName; + +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/TaskIDRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/TaskIDRequest.java new file mode 100644 index 0000000000..37a72916ff --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/TaskIDRequest.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.request; + +import lombok.Data; + +@Data +public class TaskIDRequest { + + private String taskID; + +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseRemoteResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseRemoteResponse.java index 3ea8401535..84ea3661bd 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseRemoteResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseRemoteResponse.java @@ -26,7 +26,7 @@ import lombok.Setter; @Getter -public abstract class BaseRemoteResponse implements IPayload { +public abstract class BaseRemoteResponse implements IPayload { @Setter private boolean success = true; @Setter @@ -35,6 +35,8 @@ public abstract class BaseRemoteResponse implements IPayload { private String desc; private Map header = new HashMap<>(); + @Setter + private T data; public void addHeader(String key, String value) { if (key == null || value == null) { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/CreateTaskResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/CreateTaskResponse.java index 11678dfcf0..24e7871e04 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/CreateTaskResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/CreateTaskResponse.java @@ -24,7 +24,7 @@ import lombok.Data; @Data -public class CreateTaskResponse extends BaseRemoteResponse { +public class CreateTaskResponse { private String taskId; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/HttpResponseResult.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/HttpResponseResult.java new file mode 100644 index 0000000000..b6ca8cef0d --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/HttpResponseResult.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.response; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class HttpResponseResult { + + private Integer code; + + private String message; + + private T data; + + public HttpResponseResult(Integer code, String message) { + this.code = code; + this.message = message; + } + + public HttpResponseResult(Integer code, T data) { + this.code = code; + this.data = data; + } + + public static HttpResponseResult success() { + return new HttpResponseResult<>(200, "success"); + } + + public static HttpResponseResult success(T data) { + return new HttpResponseResult<>(200, "success", data); + } + + public static HttpResponseResult failed() { + return new HttpResponseResult<>(500, "failed"); + } + + public static HttpResponseResult failed(T data) { + return new HttpResponseResult<>(500, "failed", data); + } + + public static HttpResponseResult exception(T data) { + return new HttpResponseResult<>(300, data); + } + +} \ No newline at end of file diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/QueryTaskInfoResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/QueryTaskInfoResponse.java new file mode 100644 index 0000000000..4c0c536eae --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/QueryTaskInfoResponse.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.response; + +import java.util.Date; +import java.util.List; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class QueryTaskInfoResponse { + + // event_mesh_task_info + private Integer id; + + private String taskID; + + private String taskName; + + private String taskDesc; + + private String taskState; + + private String sourceRegion; + + private String targetRegion; + + private String createUid; + + private String updateUid; + + private Date createTime; + + private Date updateTime; + + List eventMeshJobInfoList; + + @Data + public static class EventMeshJobInfo { + // event_mesh_job_info + private Integer id; + + private String jobID; + + private String jobDesc; + + private String taskID; + + private String transportType; + + private Integer sourceData; + + private Integer targetData; + + private String jobState; + + private String jobType; + + // job request from region + private String fromRegion; + + // job actually running region + private String runningRegion; + + private String createUid; + + private String updateUid; + + private Date createTime; + + private Date updateTime; + + // private List eventMeshDataSource; + + private EventMeshDataSource dataSource; + + private EventMeshDataSource dataSink; + + private EventMeshMysqlPosition eventMeshMysqlPosition; + + } + + @Data + public static class EventMeshDataSource { + + private Integer id; + + private String dataType; + + private String description; + + private String configuration; + + private String configurationClass; + + private String region; + + private String createUid; + + private String updateUid; + + private Date createTime; + + private Date updateTime; + } + + @Data + public static class EventMeshMysqlPosition { + + private Integer id; + + private String jobID; + + private String serverUUID; + + private String address; + + private Long position; + + private String gtid; + + private String currentGtid; + + private Long timestamp; + + private String journalName; + + private Date createTime; + + private Date updateTime; + } + +} \ No newline at end of file diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/QueryTaskMonitorResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/QueryTaskMonitorResponse.java new file mode 100644 index 0000000000..432729a995 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/QueryTaskMonitorResponse.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.response; + +import org.apache.eventmesh.common.remote.task.TaskMonitor; + +import java.util.List; + +import lombok.Data; + +@Data +public class QueryTaskMonitorResponse { + + private List taskMonitors; + +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/task/TaskMonitor.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/task/TaskMonitor.java new file mode 100644 index 0000000000..6d303eec3b --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/task/TaskMonitor.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote.task; + +import java.io.Serializable; +import java.util.Date; + +import lombok.Data; + +@Data +public class TaskMonitor implements Serializable { + private String taskID; + private String jobID; + private String address; + private String transportType; + private String connectorStage; + private long totalReqNum; + private long totalTimeCost; + private long maxTimeCost; + private long avgTimeCost; + private double tps; + private Date createTime; + private static final long serialVersionUID = 1L; + +}