Skip to content

Commit

Permalink
[ISSUE apache#5079] Enhancement update for admin-server
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed Aug 20, 2024
1 parent 8cb8df5 commit 61fe517
Show file tree
Hide file tree
Showing 19 changed files with 370 additions and 53 deletions.
51 changes: 25 additions & 26 deletions eventmesh-admin-server/bin/start-admin.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,34 +56,34 @@ function extract_java_version {
#}

function get_pid {
local ppid=""
if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
# If the process does not exist, it indicates that the previous process terminated abnormally.
local ppid=""
if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
# If the process does not exist, it indicates that the previous process terminated abnormally.
if [ ! -d /proc/$ppid ]; then
# Remove the residual file.
rm ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
echo -e "ERROR\t EventMesh process had already terminated unexpectedly before, please check log output."
ppid=""
fi
else
if [[ $OS =~ Msys ]]; then
# There is a Bug on Msys that may not be able to kill the identified process
ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}`
elif [[ $OS =~ Darwin ]]; then
# Known problem: grep Java may not be able to accurately identify Java processes
ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'})
else
if [ $DOCKER ]; then
# No need to exclude root user in Docker containers.
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'})
else
else
if [[ $OS =~ Msys ]]; then
# There is a Bug on Msys that may not be able to kill the identified process
ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}`
elif [[ $OS =~ Darwin ]]; then
# Known problem: grep Java may not be able to accurately identify Java processes
ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'})
else
if [ $DOCKER ]; then
# No need to exclude root user in Docker containers.
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'})
else
# It is required to identify the process as accurately as possible on Linux.
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" | awk -F ' ' {'print $2'})
fi
fi
fi
echo "$ppid";
fi
fi
echo "$ppid";
}

#===========================================================================================
Expand Down Expand Up @@ -136,8 +136,7 @@ export JAVA_HOME

GC_LOG_FILE="${EVENTMESH_ADMIN_LOG_HOME}/eventmesh_admin_gc_%p.log"

#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4"
JAVA_OPT=`cat ${EVENTMESH_ADMIN_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}`
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50"
JAVA_OPT="${JAVA_OPT} -verbose:gc"
if [[ "$JAVA_VERSION" == "8" ]]; then
Expand Down Expand Up @@ -172,7 +171,7 @@ JAVA_OPT="${JAVA_OPT} -DeventMeshPluginDir=${EVENTMESH_ADMIN_HOME}/plugin"
# echo "proxy is running already"
# exit 9;
# else
# echo "err pid$pid, rm pid.file"
# echo "err pid$pid, rm pid.file"
# rm pid.file
# fi
#fi
Expand All @@ -183,8 +182,8 @@ if [[ $pid == "ERROR"* ]]; then
exit 9
fi
if [ -n "$pid" ]; then
echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again."
exit 9
echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again."
exit 9
fi

make_logs_dir
Expand All @@ -193,9 +192,9 @@ echo "Using Java version: $JAVA_VERSION, path: $JAVA" >> ${EVENTMESH_ADMIN_LOG_H

EVENTMESH_ADMIN_MAIN=org.apache.eventmesh.admin.server.ExampleAdminServer
if [ $DOCKER ]; then
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
else
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
echo $!>${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
fi
exit 0
2 changes: 2 additions & 0 deletions eventmesh-admin-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ dependencies {
implementation "com.alibaba:druid-spring-boot-starter"
compileOnly 'com.mysql:mysql-connector-j'
compileOnly 'org.projectlombok:lombok'
testImplementation 'junit:junit:4.12'
testImplementation 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}

Expand Down
8 changes: 4 additions & 4 deletions eventmesh-admin-server/conf/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ event-mesh:
# grpc server port
port: 8081
adminServerList:
region1:
R1:
- http://localhost:8082
region2:
- http://localhost:8083
region: region1
R2:
- http://localhost:8082
region: R1
2 changes: 1 addition & 1 deletion eventmesh-admin-server/conf/eventmesh.sql
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` (
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `runtimeAddr` (`runtimeAddr`),
KEY `jobID` (`jobID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

Expand Down Expand Up @@ -137,6 +136,7 @@ CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`taskID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`jobID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`recordID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`recordSig` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`connectorName` varchar(200) COLLATE utf8_bin DEFAULT NULL,
Expand Down
5 changes: 3 additions & 2 deletions eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<resultMap id="BaseResultMap" type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify">
<id property="id" column="id" jdbcType="INTEGER"/>
<result property="taskID" column="taskID" jdbcType="VARCHAR"/>
<result property="jobID" column="jobID" jdbcType="VARCHAR"/>
<result property="recordID" column="recordID" jdbcType="VARCHAR"/>
<result property="recordSig" column="recordSig" jdbcType="VARCHAR"/>
<result property="connectorName" column="connectorName" jdbcType="VARCHAR"/>
Expand All @@ -35,8 +36,8 @@
</resultMap>

<sql id="Base_Column_List">
id,taskID,recordID,
recordSig,connectorName,connectorStage,
id,taskID,jobID,recordID,
recordSig,connectorName,connectorStage,
position,createTime
</sql>
</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.eventmesh.admin.server.web;

import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.admin.server.web.service.task.TaskBizService;
import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService;
import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
import org.apache.eventmesh.common.remote.response.CreateTaskResponse;
import org.apache.eventmesh.common.utils.JsonUtils;

Expand All @@ -31,17 +34,36 @@

@RestController
@RequestMapping("/eventmesh/admin")
@Slf4j
public class HttpServer {

@Autowired
private TaskBizService taskService;

@Autowired
private VerifyBizService verifyService;

@RequestMapping(value = "/createTask", method = RequestMethod.POST)
public ResponseEntity<Object> createOrUpdateTask(@RequestBody CreateTaskRequest task) {
log.info("receive http proto create task:{}",task);
CreateTaskResponse createTaskResponse = taskService.createTask(task);
log.info("receive http proto create task result:{}",createTaskResponse);
return ResponseEntity.ok(JsonUtils.toJSONString(Response.success(createTaskResponse)));
}


@RequestMapping(value = "/reportVerify", method = RequestMethod.POST)
public ResponseEntity<Object> reportVerify(@RequestBody ReportVerifyRequest request) {
log.info("receive http proto report verify request:{}", request);
boolean result = verifyService.reportVerifyRecord(request);
log.info("receive http proto report verify result:{}", result);
if (result) {
return ResponseEntity.ok("report verify success.request:" + JsonUtils.toJSONString(request));
} else {
return ResponseEntity.internalServerError().body("report verify success.request:" + JsonUtils.toJSONString(request));
}
}

public boolean deleteTask(Long id) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.eventmesh.common.EventMeshThreadFactory;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand All @@ -39,22 +40,43 @@ public class DBThreadPool {
new LinkedBlockingQueue<>(1000), new EventMeshThreadFactory("admin-server-db"),
new ThreadPoolExecutor.DiscardOldestPolicy());


private final ScheduledThreadPoolExecutor checkScheduledExecutor =
new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new EventMeshThreadFactory("admin-server-check-scheduled"),
new ThreadPoolExecutor.DiscardOldestPolicy());

@PreDestroy
private void destroy() {
if (!executor.isShutdown()) {
try {
executor.shutdown();
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
log.info("wait heart beat handler thread pool shutdown timeout, it will shutdown immediately");
log.info("wait handler thread pool shutdown timeout, it will shutdown immediately");
executor.shutdownNow();
}
} catch (InterruptedException e) {
log.warn("wait heart beat handler thread pool shutdown fail");
log.warn("wait handler thread pool shutdown fail");
}
}

if (!checkScheduledExecutor.isShutdown()) {
try {
checkScheduledExecutor.shutdown();
if (!checkScheduledExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
log.info("wait scheduled thread pool shutdown timeout, it will shutdown immediately");
checkScheduledExecutor.shutdownNow();
}
} catch (InterruptedException e) {
log.warn("wait scheduled thread pool shutdown fail");
}
}
}

public ThreadPoolExecutor getExecutors() {
return executor;
}

public ScheduledThreadPoolExecutor getCheckExecutor() {
return checkScheduledExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class EventMeshVerify implements Serializable {

private String taskID;

private String jobID;

private String recordID;

private String recordSig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public FetchJobResponse handler(FetchJobRequest request, Metadata metadata) {
config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource().getConf()));
config.setSourceConnectorDesc(detail.getSourceConnectorDesc());
config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource().getConf()));
config.setSourceConnectorDesc(detail.getSinkConnectorDesc());
config.setSinkConnectorDesc(detail.getSinkConnectorDesc());
response.setConnectorConfig(config);
response.setTransportType(detail.getTransportType());
response.setState(detail.getState());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.admin.server.web.handler.impl;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService;
import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
import org.apache.eventmesh.common.remote.exception.ErrorCode;
import org.apache.eventmesh.common.remote.request.ReportJobRequest;
import org.apache.eventmesh.common.remote.response.SimpleResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ReportJobRequestHandler extends BaseRequestHandler<ReportJobRequest, SimpleResponse> {

@Autowired
JobInfoBizService jobInfoBizService;

@Override
public SimpleResponse handler(ReportJobRequest request, Metadata metadata) {
log.info("receive report job request:{}", request);
if (StringUtils.isBlank(request.getJobID())) {
return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, it's empty");
}
EventMeshJobInfo jobInfo = jobInfoBizService.getJobInfo(request.getJobID());
if (jobInfo == null) {
return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, not exist target job,jobID:" + request.getJobID());
}
boolean result = jobInfoBizService.updateJobState(jobInfo.getJobID(), request.getState());
if (result) {
return SimpleResponse.success();
} else {
return SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "update job failed.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ReportPositionHandler extends BaseRequestHandler<ReportPositionRequ

@Override
protected SimpleResponse handler(ReportPositionRequest request, Metadata metadata) {
log.info("receive report position request:{}", request);
if (StringUtils.isBlank(request.getJobID())) {
log.info("request [{}] illegal job id", request);
return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, it's empty");
Expand Down
Loading

0 comments on commit 61fe517

Please sign in to comment.