From 61fe5179c93f4e6c574b2205a923953e8e668bbe Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Tue, 20 Aug 2024 14:59:54 +0800 Subject: [PATCH] [ISSUE #5079] Enhancement update for admin-server --- eventmesh-admin-server/bin/start-admin.sh | 51 ++++++------ eventmesh-admin-server/build.gradle | 2 + eventmesh-admin-server/conf/application.yaml | 8 +- eventmesh-admin-server/conf/eventmesh.sql | 2 +- .../conf/mapper/EventMeshVerifyMapper.xml | 5 +- .../admin/server/web/HttpServer.java | 22 +++++ .../admin/server/web/db/DBThreadPool.java | 26 +++++- .../server/web/db/entity/EventMeshVerify.java | 2 + .../handler/impl/FetchJobRequestHandler.java | 2 +- .../handler/impl/ReportJobRequestHandler.java | 56 +++++++++++++ .../handler/impl/ReportPositionHandler.java | 1 + .../web/handler/impl/ReportVerifyHandler.java | 50 ++++++++++-- .../web/service/job/JobInfoBizService.java | 81 ++++++++++++++++--- .../position/impl/HttpPositionHandler.java | 57 +++++++++++++ .../web/service/verify/VerifyBizService.java | 1 + .../eventmesh/common/remote/JobState.java | 51 ++++++++++++ .../remote/request/ReportVerifyRequest.java | 2 + ...e.eventmesh.common.remote.payload.IPayload | 1 + .../offsetmgmt/admin/AdminOffsetService.java | 3 + 19 files changed, 370 insertions(+), 53 deletions(-) create mode 100644 eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java create mode 100644 eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java diff --git a/eventmesh-admin-server/bin/start-admin.sh b/eventmesh-admin-server/bin/start-admin.sh index 93c3644397..1633036617 100644 --- a/eventmesh-admin-server/bin/start-admin.sh +++ b/eventmesh-admin-server/bin/start-admin.sh @@ -56,34 +56,34 @@ function extract_java_version { #} function get_pid { - local ppid="" - if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then - ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file) - # If the process does not exist, it indicates that the previous process terminated abnormally. + local ppid="" + if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then + ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file) + # If the process does not exist, it indicates that the previous process terminated abnormally. if [ ! -d /proc/$ppid ]; then # Remove the residual file. rm ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file echo -e "ERROR\t EventMesh process had already terminated unexpectedly before, please check log output." ppid="" fi - else - if [[ $OS =~ Msys ]]; then - # There is a Bug on Msys that may not be able to kill the identified process - ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}` - elif [[ $OS =~ Darwin ]]; then - # Known problem: grep Java may not be able to accurately identify Java processes - ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'}) - else - if [ $DOCKER ]; then - # No need to exclude root user in Docker containers. - ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'}) - else + else + if [[ $OS =~ Msys ]]; then + # There is a Bug on Msys that may not be able to kill the identified process + ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}` + elif [[ $OS =~ Darwin ]]; then + # Known problem: grep Java may not be able to accurately identify Java processes + ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'}) + else + if [ $DOCKER ]; then + # No need to exclude root user in Docker containers. + ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'}) + else # It is required to identify the process as accurately as possible on Linux. ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" | awk -F ' ' {'print $2'}) fi - fi - fi - echo "$ppid"; + fi + fi + echo "$ppid"; } #=========================================================================================== @@ -136,8 +136,7 @@ export JAVA_HOME GC_LOG_FILE="${EVENTMESH_ADMIN_LOG_HOME}/eventmesh_admin_gc_%p.log" -#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4" -JAVA_OPT=`cat ${EVENTMESH_ADMIN_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}` +JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g" JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50" JAVA_OPT="${JAVA_OPT} -verbose:gc" if [[ "$JAVA_VERSION" == "8" ]]; then @@ -172,7 +171,7 @@ JAVA_OPT="${JAVA_OPT} -DeventMeshPluginDir=${EVENTMESH_ADMIN_HOME}/plugin" # echo "proxy is running already" # exit 9; # else -# echo "err pid$pid, rm pid.file" +# echo "err pid$pid, rm pid.file" # rm pid.file # fi #fi @@ -183,8 +182,8 @@ if [[ $pid == "ERROR"* ]]; then exit 9 fi if [ -n "$pid" ]; then - echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again." - exit 9 + echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again." + exit 9 fi make_logs_dir @@ -193,9 +192,9 @@ echo "Using Java version: $JAVA_VERSION, path: $JAVA" >> ${EVENTMESH_ADMIN_LOG_H EVENTMESH_ADMIN_MAIN=org.apache.eventmesh.admin.server.ExampleAdminServer if [ $DOCKER ]; then - $JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out + $JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out else - $JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 & + $JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 & echo $!>${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file fi exit 0 diff --git a/eventmesh-admin-server/build.gradle b/eventmesh-admin-server/build.gradle index 1fec2c7c52..95c8fa1372 100644 --- a/eventmesh-admin-server/build.gradle +++ b/eventmesh-admin-server/build.gradle @@ -38,6 +38,8 @@ dependencies { implementation "com.alibaba:druid-spring-boot-starter" compileOnly 'com.mysql:mysql-connector-j' compileOnly 'org.projectlombok:lombok' + testImplementation 'junit:junit:4.12' + testImplementation 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' } diff --git a/eventmesh-admin-server/conf/application.yaml b/eventmesh-admin-server/conf/application.yaml index 274196db60..3d702e579e 100644 --- a/eventmesh-admin-server/conf/application.yaml +++ b/eventmesh-admin-server/conf/application.yaml @@ -35,8 +35,8 @@ event-mesh: # grpc server port port: 8081 adminServerList: - region1: + R1: - http://localhost:8082 - region2: - - http://localhost:8083 - region: region1 \ No newline at end of file + R2: + - http://localhost:8082 + region: R1 \ No newline at end of file diff --git a/eventmesh-admin-server/conf/eventmesh.sql b/eventmesh-admin-server/conf/eventmesh.sql index 986320570a..6e28daca8a 100644 --- a/eventmesh-admin-server/conf/eventmesh.sql +++ b/eventmesh-admin-server/conf/eventmesh.sql @@ -102,7 +102,6 @@ CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` ( `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`), - UNIQUE KEY `runtimeAddr` (`runtimeAddr`), KEY `jobID` (`jobID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; @@ -137,6 +136,7 @@ CREATE TABLE IF NOT EXISTS `event_mesh_task_info` ( CREATE TABLE IF NOT EXISTS `event_mesh_verify` ( `id` int unsigned NOT NULL AUTO_INCREMENT, `taskID` varchar(50) COLLATE utf8_bin DEFAULT NULL, + `jobID` varchar(50) COLLATE utf8_bin DEFAULT NULL, `recordID` varchar(50) COLLATE utf8_bin DEFAULT NULL, `recordSig` varchar(50) COLLATE utf8_bin DEFAULT NULL, `connectorName` varchar(200) COLLATE utf8_bin DEFAULT NULL, diff --git a/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml index b7b042145a..45727498cc 100644 --- a/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml +++ b/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml @@ -26,6 +26,7 @@ + @@ -35,8 +36,8 @@ - id,taskID,recordID, - recordSig,connectorName,connectorStage, + id,taskID,jobID,recordID, + recordSig,connectorName,connectorStage, position,createTime diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java index 12afb3a3d4..8350802f75 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java @@ -17,8 +17,11 @@ package org.apache.eventmesh.admin.server.web; +import lombok.extern.slf4j.Slf4j; import org.apache.eventmesh.admin.server.web.service.task.TaskBizService; +import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService; import org.apache.eventmesh.common.remote.request.CreateTaskRequest; +import org.apache.eventmesh.common.remote.request.ReportVerifyRequest; import org.apache.eventmesh.common.remote.response.CreateTaskResponse; import org.apache.eventmesh.common.utils.JsonUtils; @@ -31,17 +34,36 @@ @RestController @RequestMapping("/eventmesh/admin") +@Slf4j public class HttpServer { @Autowired private TaskBizService taskService; + @Autowired + private VerifyBizService verifyService; + @RequestMapping(value = "/createTask", method = RequestMethod.POST) public ResponseEntity createOrUpdateTask(@RequestBody CreateTaskRequest task) { + log.info("receive http proto create task:{}",task); CreateTaskResponse createTaskResponse = taskService.createTask(task); + log.info("receive http proto create task result:{}",createTaskResponse); return ResponseEntity.ok(JsonUtils.toJSONString(Response.success(createTaskResponse))); } + + @RequestMapping(value = "/reportVerify", method = RequestMethod.POST) + public ResponseEntity reportVerify(@RequestBody ReportVerifyRequest request) { + log.info("receive http proto report verify request:{}", request); + boolean result = verifyService.reportVerifyRecord(request); + log.info("receive http proto report verify result:{}", result); + if (result) { + return ResponseEntity.ok("report verify success.request:" + JsonUtils.toJSONString(request)); + } else { + return ResponseEntity.internalServerError().body("report verify success.request:" + JsonUtils.toJSONString(request)); + } + } + public boolean deleteTask(Long id) { return false; } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java index f1de764967..124eca4261 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java @@ -20,6 +20,7 @@ import org.apache.eventmesh.common.EventMeshThreadFactory; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -39,17 +40,34 @@ public class DBThreadPool { new LinkedBlockingQueue<>(1000), new EventMeshThreadFactory("admin-server-db"), new ThreadPoolExecutor.DiscardOldestPolicy()); + + private final ScheduledThreadPoolExecutor checkScheduledExecutor = + new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new EventMeshThreadFactory("admin-server-check-scheduled"), + new ThreadPoolExecutor.DiscardOldestPolicy()); + @PreDestroy private void destroy() { if (!executor.isShutdown()) { try { executor.shutdown(); if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { - log.info("wait heart beat handler thread pool shutdown timeout, it will shutdown immediately"); + log.info("wait handler thread pool shutdown timeout, it will shutdown immediately"); executor.shutdownNow(); } } catch (InterruptedException e) { - log.warn("wait heart beat handler thread pool shutdown fail"); + log.warn("wait handler thread pool shutdown fail"); + } + } + + if (!checkScheduledExecutor.isShutdown()) { + try { + checkScheduledExecutor.shutdown(); + if (!checkScheduledExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + log.info("wait scheduled thread pool shutdown timeout, it will shutdown immediately"); + checkScheduledExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + log.warn("wait scheduled thread pool shutdown fail"); } } } @@ -57,4 +75,8 @@ private void destroy() { public ThreadPoolExecutor getExecutors() { return executor; } + + public ScheduledThreadPoolExecutor getCheckExecutor() { + return checkScheduledExecutor; + } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java index 5425c5c57b..9d3e817ff9 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java @@ -37,6 +37,8 @@ public class EventMeshVerify implements Serializable { private String taskID; + private String jobID; + private String recordID; private String recordSig; diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java index b377bcddd8..3392084c28 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java @@ -56,7 +56,7 @@ public FetchJobResponse handler(FetchJobRequest request, Metadata metadata) { config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource().getConf())); config.setSourceConnectorDesc(detail.getSourceConnectorDesc()); config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource().getConf())); - config.setSourceConnectorDesc(detail.getSinkConnectorDesc()); + config.setSinkConnectorDesc(detail.getSinkConnectorDesc()); response.setConnectorConfig(config); response.setTransportType(detail.getTransportType()); response.setState(detail.getState()); diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java new file mode 100644 index 0000000000..defec3f8ee --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.handler.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; +import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; +import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.request.ReportJobRequest; +import org.apache.eventmesh.common.remote.response.SimpleResponse; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class ReportJobRequestHandler extends BaseRequestHandler { + + @Autowired + JobInfoBizService jobInfoBizService; + + @Override + public SimpleResponse handler(ReportJobRequest request, Metadata metadata) { + log.info("receive report job request:{}", request); + if (StringUtils.isBlank(request.getJobID())) { + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, it's empty"); + } + EventMeshJobInfo jobInfo = jobInfoBizService.getJobInfo(request.getJobID()); + if (jobInfo == null) { + return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, not exist target job,jobID:" + request.getJobID()); + } + boolean result = jobInfoBizService.updateJobState(jobInfo.getJobID(), request.getState()); + if (result) { + return SimpleResponse.success(); + } else { + return SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "update job failed."); + } + } +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java index 5e2a968262..78335d419a 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java @@ -48,6 +48,7 @@ public class ReportPositionHandler extends BaseRequestHandler adminServerList = properties.getAdminServerList().get(fromRegion); + if (adminServerList == null || adminServerList.isEmpty()) { + throw new RuntimeException("No admin server available for region: " + fromRegion); + } + String targetUrl = adminServerList.get(new Random().nextInt(adminServerList.size())) + "/eventmesh/admin/reportVerify"; + RestTemplate restTemplate = new RestTemplate(); + ResponseEntity response = restTemplate.postForEntity(targetUrl, request, String.class); + if (!response.getStatusCode().is2xxSuccessful()) { + return SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save verify request fail,code:" + response.getStatusCode() + ",msg:" + response.getBody()); + } + return SimpleResponse.success(); } - return verifyService.reportVerifyRecord(request) ? SimpleResponse.success() : SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save verify " - + "request fail"); } } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java index 0657383e23..70abececb1 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java @@ -19,15 +19,19 @@ import org.apache.eventmesh.admin.server.AdminServerProperties; import org.apache.eventmesh.admin.server.AdminServerRuntimeException; +import org.apache.eventmesh.admin.server.web.db.DBThreadPool; import org.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat; import org.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService; import org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoExtService; import org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatService; import org.apache.eventmesh.admin.server.web.pojo.JobDetail; import org.apache.eventmesh.admin.server.web.service.datasource.DataSourceBizService; import org.apache.eventmesh.admin.server.web.service.position.PositionBizService; import org.apache.eventmesh.common.config.connector.Config; +import org.apache.eventmesh.common.remote.JobState; import org.apache.eventmesh.common.remote.TaskState; import org.apache.eventmesh.common.remote.TransportType; import org.apache.eventmesh.common.remote.datasource.DataSource; @@ -35,20 +39,18 @@ import org.apache.eventmesh.common.remote.exception.ErrorCode; import org.apache.eventmesh.common.remote.request.CreateOrUpdateDataSourceReq; import org.apache.eventmesh.common.utils.JsonUtils; - import org.apache.commons.lang3.StringUtils; - +import java.time.Duration; import java.util.LinkedList; import java.util.List; import java.util.UUID; - +import java.util.concurrent.TimeUnit; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; - import com.baomidou.mybatisplus.core.toolkit.Wrappers; - import lombok.extern.slf4j.Slf4j; +import javax.annotation.PostConstruct; /** * for table 'event_mesh_job_info' db operation @@ -75,13 +77,41 @@ public class JobInfoBizService { @Autowired private AdminServerProperties properties; + @Autowired + EventMeshRuntimeHeartbeatService heartbeatService; + + private final long heatBeatPeriod = Duration.ofMillis(5000).toMillis(); + + @Autowired + DBThreadPool executor; + + @PostConstruct + public void init() { + log.info("init check job info scheduled task."); + executor.getCheckExecutor().scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + checkJobInfo(); + } + }, 10, 10, TimeUnit.SECONDS); + } + public boolean updateJobState(String jobID, TaskState state) { if (jobID == null || state == null) { return false; } EventMeshJobInfo jobInfo = new EventMeshJobInfo(); jobInfo.setJobState(state.name()); - return jobInfoService.update(jobInfo, Wrappers.update().eq("jobID", jobID).ne("state", TaskState.DELETE.name())); + return jobInfoService.update(jobInfo, Wrappers.update().eq("jobID", jobID).ne("jobState", JobState.DELETE.name())); + } + + public boolean updateJobState(String jobID, JobState state) { + if (jobID == null || state == null) { + return false; + } + EventMeshJobInfo jobInfo = new EventMeshJobInfo(); + jobInfo.setJobState(state.name()); + return jobInfoService.update(jobInfo, Wrappers.update().eq("jobID", jobID).ne("jobState", JobState.DELETE.name())); } @Transactional @@ -114,7 +144,8 @@ public List createJobs(List jobs) { source.setOperator(job.getCreateUid()); source.setRegion(job.getSourceDataSource().getRegion()); source.setDesc(job.getSourceConnectorDesc()); - source.setConfig(job.getSourceDataSource().getConf()); + Config sourceConfig = job.getSourceDataSource().getConf(); + source.setConfig(sourceConfig); source.setConfigClass(job.getSourceDataSource().getConfClazz().getName()); EventMeshDataSource createdSource = dataSourceBizService.createDataSource(source); entity.setSourceData(createdSource.getId()); @@ -124,7 +155,8 @@ public List createJobs(List jobs) { sink.setOperator(job.getCreateUid()); sink.setRegion(job.getSinkDataSource().getRegion()); sink.setDesc(job.getSinkConnectorDesc()); - sink.setConfig(job.getSinkDataSource().getConf()); + Config sinkConfig = job.getSinkDataSource().getConf(); + sink.setConfig(sinkConfig); sink.setConfigClass(job.getSinkDataSource().getConfClazz().getName()); EventMeshDataSource createdSink = dataSourceBizService.createDataSource(sink); entity.setTargetData(createdSink.getId()); @@ -134,7 +166,7 @@ public List createJobs(List jobs) { int changed = jobInfoExtService.batchSave(entityList); if (changed != jobs.size()) { throw new AdminServerRuntimeException(ErrorCode.INTERNAL_ERR, String.format("create [%d] jobs of not match expect [%d]", - changed, jobs.size())); + changed, jobs.size())); } return entityList; } @@ -168,7 +200,7 @@ public JobDetail getJobDetail(String jobID) { detail.setSourceConnectorDesc(source.getDescription()); if (source.getDataType() != null) { detail.setPositions(positionBizService.getPositionByJobID(job.getJobID(), - DataSourceType.getDataSourceType(source.getDataType()))); + DataSourceType.getDataSourceType(source.getDataType()))); } } @@ -195,6 +227,35 @@ public JobDetail getJobDetail(String jobID) { detail.setTransportType(TransportType.getTransportType(job.getTransportType())); return detail; } + + public EventMeshJobInfo getJobInfo(String jobID) { + if (jobID == null) { + return null; + } + EventMeshJobInfo job = jobInfoService.getOne(Wrappers.query().eq("jobID", jobID)); + return job; + } + + public void checkJobInfo() { + List eventMeshJobInfoList = jobInfoService.list(Wrappers.query().eq("jobState", JobState.RUNNING.name())); + log.info("start check job info.to check job size:{}", eventMeshJobInfoList.size()); + for (EventMeshJobInfo jobInfo : eventMeshJobInfoList) { + String jobID = jobInfo.getJobID(); + if (StringUtils.isEmpty(jobID)) { + continue; + } + EventMeshRuntimeHeartbeat heartbeat = heartbeatService.getOne(Wrappers.query().eq("jobID", jobID)); + if (heartbeat == null) { + continue; + } + // if last heart beat update time have delay three period.print job heart beat delay warn + long currentTimeStamp = System.currentTimeMillis(); + if (currentTimeStamp - heartbeat.getUpdateTime().getTime() > 3 * heatBeatPeriod) { + log.warn("current job heart heart has delay.jobID:{},currentTimeStamp:{},last update time:{}", jobID, currentTimeStamp, heartbeat.getUpdateTime()); + } + } + } + } diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java new file mode 100644 index 0000000000..b0f89ec03d --- /dev/null +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.server.web.service.position.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.admin.server.web.db.service.EventMeshPositionReporterHistoryService; +import org.apache.eventmesh.admin.server.web.service.position.PositionHandler; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.remote.datasource.DataSourceType; +import org.apache.eventmesh.common.remote.offset.RecordPosition; +import org.apache.eventmesh.common.remote.request.FetchPositionRequest; +import org.apache.eventmesh.common.remote.request.ReportPositionRequest; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.List; + +@Component +@Slf4j +public class HttpPositionHandler extends PositionHandler { + @Autowired + EventMeshPositionReporterHistoryService historyService; + + @Override + protected DataSourceType getSourceType() { + return DataSourceType.HTTP; + } + + @Override + public boolean handler(ReportPositionRequest request, Metadata metadata) { + log.info("receive http position report request:{}", request); + // mock wemq postion report store + return true; + } + + @Override + public List handler(FetchPositionRequest request, Metadata metadata) { + // mock http position fetch request + List recordPositionList = new ArrayList<>(); + return recordPositionList; + } +} diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java index 74f208b199..9d648e0a72 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java @@ -35,6 +35,7 @@ public boolean reportVerifyRecord(ReportVerifyRequest request) { verify.setRecordSig(request.getRecordSig()); verify.setPosition(request.getPosition()); verify.setTaskID(request.getTaskID()); + verify.setJobID(request.getJobID()); verify.setConnectorName(request.getConnectorName()); verify.setConnectorStage(request.getConnectorStage()); return verifyService.save(verify); diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java new file mode 100644 index 0000000000..53d20f2ace --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.remote; + +import lombok.ToString; + +import java.util.HashMap; +import java.util.Map; + +@ToString +public enum JobState { + INIT, RUNNING, COMPLETE, DELETE, FAIL; + private static final JobState[] STATES_NUM_INDEX = JobState.values(); + private static final Map STATES_NAME_INDEX = new HashMap<>(); + static { + for (JobState jobState : STATES_NUM_INDEX) { + STATES_NAME_INDEX.put(jobState.name(), jobState); + } + } + + public static JobState fromIndex(Integer index) { + if (index == null || index < 0 || index >= STATES_NUM_INDEX.length) { + return null; + } + + return STATES_NUM_INDEX[index]; + } + + public static JobState fromIndex(String index) { + if (index == null || index.isEmpty()) { + return null; + } + + return STATES_NAME_INDEX.get(index); + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java index cd541949f4..bd38881c3d 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java @@ -28,6 +28,8 @@ public class ReportVerifyRequest extends BaseRemoteRequest { private String taskID; + private String jobID; + private String recordID; private String recordSig; diff --git a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload index 82d5c94dd3..433cf57ed1 100644 --- a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload +++ b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload @@ -16,6 +16,7 @@ org.apache.eventmesh.common.remote.request.FetchJobRequest org.apache.eventmesh.common.remote.response.FetchJobResponse org.apache.eventmesh.common.remote.request.ReportPositionRequest +org.apache.eventmesh.common.remote.request.ReportJobRequest org.apache.eventmesh.common.remote.request.ReportVerifyRequest org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest org.apache.eventmesh.common.remote.request.FetchPositionRequest diff --git a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java index 977661b134..993352a979 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java +++ b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java @@ -112,6 +112,8 @@ public void persist() { reportPositionRequest.setRecordPositionList(recordToSyncList); + log.debug("start report position request: {}", JsonUtils.toJSONString(reportPositionRequest)); + Metadata metadata = Metadata.newBuilder() .setType(ReportPositionRequest.class.getSimpleName()) .build(); @@ -121,6 +123,7 @@ public void persist() { .build()) .build(); requestObserver.onNext(payload); + log.debug("end report position request: {}", JsonUtils.toJSONString(reportPositionRequest)); for (Map.Entry entry : recordMap.entrySet()) { positionStore.remove(entry.getKey());