Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #5067] Enhancement for eventmesh-admin-server #5068

Merged
merged 15 commits into from
Aug 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions eventmesh-admin-server/conf/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,11 @@ mybatis-plus:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
event-mesh:
admin-server:
service-name: DEFAULT_GROUP@@em_adm_server
port: 8081
serviceName: DEFAULT_GROUP@@em_adm_server
port: 8081
adminServerList:
region1:
- http://localhost:8081
region2:
- http://localhost:8082
region: region1
14 changes: 8 additions & 6 deletions eventmesh-admin-server/conf/eventmesh.sql
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`jobID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`jobDesc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`transportType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`sourceData` int NOT NULL DEFAULT '0',
`targetData` int NOT NULL DEFAULT '0',
`state` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`jobState` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`jobType` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`fromRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`runningRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createUid` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`updateUid` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
Expand Down Expand Up @@ -118,10 +119,11 @@ CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` (
CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'taskstate',
`fromRegion` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`taskName` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`taskDesc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`taskState` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'taskstate',
`sourceRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`targetRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
Expand Down
40 changes: 21 additions & 19 deletions eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,33 @@
-->

<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper">

<resultMap id="BaseResultMap" type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo">
<id property="id" column="id" jdbcType="INTEGER"/>
<result property="jobID" column="jobID" jdbcType="VARCHAR"/>
<result property="desc" column="desc" jdbcType="VARCHAR"/>
<result property="taskID" column="taskID" jdbcType="VARCHAR"/>
<result property="transportType" column="transportType" jdbcType="VARCHAR"/>
<result property="sourceData" column="sourceData" jdbcType="INTEGER"/>
<result property="targetData" column="targetData" jdbcType="INTEGER"/>
<result property="state" column="state" jdbcType="VARCHAR"/>
<result property="jobType" column="jobType" jdbcType="VARCHAR"/>
<result property="fromRegion" column="fromRegion" jdbcType="VARCHAR"/>
<result property="createUid" column="createUid" jdbcType="VARCHAR"/>
<result property="updateUid" column="updateUid" jdbcType="VARCHAR"/>
<result property="createTime" column="createTime" jdbcType="TIMESTAMP"/>
<result property="updateTime" column="updateTime" jdbcType="TIMESTAMP"/>
<id property="id" column="id" jdbcType="INTEGER"/>
<result property="jobID" column="jobID" jdbcType="VARCHAR"/>
<result property="jobDesc" column="desc" jdbcType="VARCHAR"/>
<result property="taskID" column="taskID" jdbcType="VARCHAR"/>
<result property="transportType" column="transportType" jdbcType="VARCHAR"/>
<result property="sourceData" column="sourceData" jdbcType="INTEGER"/>
<result property="targetData" column="targetData" jdbcType="INTEGER"/>
<result property="jobState" column="state" jdbcType="VARCHAR"/>
<result property="jobType" column="jobType" jdbcType="VARCHAR"/>
<result property="fromRegion" column="sourceRegion" jdbcType="VARCHAR"/>
<result property="runningRegion" column="targetRegion" jdbcType="VARCHAR"/>
<result property="createUid" column="createUid" jdbcType="VARCHAR"/>
<result property="updateUid" column="updateUid" jdbcType="VARCHAR"/>
<result property="createTime" column="createTime" jdbcType="TIMESTAMP"/>
<result property="updateTime" column="updateTime" jdbcType="TIMESTAMP"/>
</resultMap>

<sql id="Base_Column_List">
id,jobID,desc,
id,jobID,jobDesc,
taskID,transportType,sourceData,
targetData,state,jobType,
fromRegion,createTime,updateTime
targetData,jobState,jobType,
fromRegion,runningRegion,createUid,
updateUid,createTime,updateTime
</sql>
</mapper>
13 changes: 7 additions & 6 deletions eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,20 @@
<resultMap id="BaseResultMap" type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo">
<id property="id" column="id" jdbcType="INTEGER"/>
<result property="taskID" column="taskID" jdbcType="VARCHAR"/>
<result property="name" column="name" jdbcType="VARCHAR"/>
<result property="desc" column="desc" jdbcType="VARCHAR"/>
<result property="state" column="state" jdbcType="VARCHAR"/>
<result property="fromRegion" column="fromRegion" jdbcType="VARCHAR"/>
<result property="taskName" column="taskName" jdbcType="VARCHAR"/>
<result property="taskDesc" column="taskDesc" jdbcType="VARCHAR"/>
<result property="taskState" column="taskState" jdbcType="VARCHAR"/>
<result property="sourceRegion" column="sourceRegion" jdbcType="VARCHAR"/>
<result property="targetRegion" column="targetRegion" jdbcType="VARCHAR"/>
<result property="createUid" column="createUid" jdbcType="VARCHAR"/>
<result property="updateUid" column="updateUid" jdbcType="VARCHAR"/>
<result property="createTime" column="createTime" jdbcType="TIMESTAMP"/>
<result property="updateTime" column="updateTime" jdbcType="TIMESTAMP"/>
</resultMap>

<sql id="Base_Column_List">
id,taskID,name,
desc,state,fromRegion,
id,taskID,taskName,
taskDesc,taskState,sourceRegion,targetRegion,
createUid,updateUid,createTime,
updateTime
</sql>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.eventmesh.admin.server;

import java.util.List;
import java.util.Map;

import org.springframework.boot.context.properties.ConfigurationProperties;

import lombok.Getter;
Expand All @@ -32,4 +35,6 @@ public class AdminServerProperties {
private String configurationPath;
private String configurationFile;
private String serviceName;
private Map<String, List<String>> adminServerList;
private String region;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,21 @@
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.alibaba.druid.support.json.JSONUtils;

@RestController
@RequestMapping("/eventmesh/admin")
public class HttpServer {
@Autowired
private TaskBizService taskService;

@RequestMapping("/createTask")
public ResponseEntity<Response<String>> createOrUpdateTask(@RequestBody CreateTaskRequest task) {
@RequestMapping(value = "/createTask", method = RequestMethod.POST)
public ResponseEntity<Object> createOrUpdateTask(@RequestBody CreateTaskRequest task) {
String uuid = taskService.createTask(task);
return ResponseEntity.ok(Response.success(uuid));
return ResponseEntity.ok(JSONUtils.toJSONString(Response.success(uuid)));
}

public boolean deleteTask(Long id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class EventMeshJobInfo implements Serializable {

private String jobID;

private String desc;
private String jobDesc;

private String taskID;

Expand All @@ -47,12 +47,16 @@ public class EventMeshJobInfo implements Serializable {

private Integer targetData;

private String state;
private String jobState;

private String jobType;

// job request from region
private String fromRegion;

// job actually running region
private String runningRegion;

private String createUid;

private String updateUid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ public class EventMeshTaskInfo implements Serializable {

private String taskID;

private String name;
private String taskName;

private String desc;
private String taskDesc;

private String state;
private String taskState;

private String fromRegion;
private String sourceRegion;

private String targetRegion;

private String createUid;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,31 @@

import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;

import java.util.List;

import org.springframework.transaction.annotation.Transactional;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;

/**
* etx operator for table event_mesh_job_info
*/
@Mapper
public interface EventMeshJobInfoExtMapper extends BaseMapper<EventMeshJobInfo> {
@Insert("insert into event_mesh_job_info(`taskID`,`state`,`jobType`) values"
+ "<foreach collection= 'jobs' item='job' separator=','>(#{job.taskID},#{job.state},#{job.jobType})</foreach>")
@Options(useGeneratedKeys = true, keyProperty = "jobID")

@Insert("<script>"
+ "insert into event_mesh_job_info(jobID, jobDesc, taskID, transportType, sourceData, "
+ "targetData, jobState, jobType, fromRegion, runningRegion, "
+ "createUid, updateUid) values"
+ "<foreach collection= 'jobs' item='job' separator=','>"
+ "(#{job.jobID}, #{job.jobDesc}, #{job.taskID}, #{job.transportType}, "
+ "#{job.sourceData}, #{job.targetData}, #{job.jobState}, #{job.jobType}, "
+ "#{job.fromRegion}, #{job.runningRegion}, #{job.createUid}, #{job.updateUid})"
+ "</foreach>"
+ "</script>")
@Transactional(rollbackFor = Exception.class)
int saveBatch(@Param("jobs") List<EventMeshJobInfo> jobInfoList);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.admin.server.web.db.service.impl;

import org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify;
import org.apache.eventmesh.admin.server.web.db.mapper.EventMeshVerifyMapper;
import org.apache.eventmesh.admin.server.web.db.service.EventMeshVerifyService;

import org.springframework.stereotype.Service;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;

/**
* event_mesh_verify
*/
@Service
public class EventMeshVerifyServiceImpl extends ServiceImpl<EventMeshVerifyMapper, EventMeshVerify>
implements EventMeshVerifyService {

}




Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class JobDetail {

private String jobID;

private String desc;
private String jobDesc;

private String taskID;

Expand All @@ -50,7 +50,11 @@ public class JobDetail {

private String updateUid;

private String region;
// job request from region
private String fromRegion;

// job actually running region
private String runningRegion;

private DataSource sourceDataSource;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.admin.server.web.service.job;

import org.apache.eventmesh.admin.server.AdminServerProperties;
import org.apache.eventmesh.admin.server.AdminServerRuntimeException;
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource;
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
Expand Down Expand Up @@ -70,12 +71,15 @@ public class JobInfoBizService {
@Autowired
private PositionBizService positionBizService;

@Autowired
private AdminServerProperties properties;

public boolean updateJobState(String jobID, TaskState state) {
if (jobID == null || state == null) {
return false;
}
EventMeshJobInfo jobInfo = new EventMeshJobInfo();
jobInfo.setState(state.name());
jobInfo.setJobState(state.name());
return jobInfoService.update(jobInfo, Wrappers.<EventMeshJobInfo>update().eq("jobID", jobID).ne("state", TaskState.DELETE.name()));
}

Expand All @@ -86,34 +90,40 @@ public List<EventMeshJobInfo> createJobs(List<JobDetail> jobs) {
return null;
}
List<EventMeshJobInfo> entityList = new LinkedList<>();

for (JobDetail job : jobs) {
// if running region not equal with admin region continue
if (!job.getRunningRegion().equals(properties.getRegion())) {
continue;
}
EventMeshJobInfo entity = new EventMeshJobInfo();
entity.setState(TaskState.INIT.name());
entity.setJobState(TaskState.INIT.name());
entity.setTaskID(job.getTaskID());
entity.setJobType(job.getJobType().name());
entity.setDesc(job.getDesc());
entity.setJobDesc(job.getJobDesc());
String jobID = UUID.randomUUID().toString();
entity.setJobID(jobID);
entity.setTransportType(job.getTransportType().name());
entity.setCreateUid(job.getCreateUid());
entity.setUpdateUid(job.getUpdateUid());
entity.setFromRegion(job.getRegion());
entity.setFromRegion(job.getFromRegion());
entity.setRunningRegion(job.getRunningRegion());
CreateOrUpdateDataSourceReq source = new CreateOrUpdateDataSourceReq();
source.setType(job.getTransportType().getSrc());
source.setOperator(job.getCreateUid());
source.setRegion(job.getRegion());
source.setRegion(job.getSourceDataSource().getRegion());
source.setDesc(job.getSourceConnectorDesc());
source.setConfig(job.getSourceDataSource());
source.setConfig(job.getSourceDataSource().getConf());
EventMeshDataSource createdSource = dataSourceBizService.createDataSource(source);
entity.setSourceData(createdSource.getId());

CreateOrUpdateDataSourceReq sink = new CreateOrUpdateDataSourceReq();
sink.setType(job.getTransportType().getDst());
sink.setOperator(job.getCreateUid());
sink.setRegion(job.getRegion());
sink.setRegion(job.getSinkDataSource().getRegion());
sink.setDesc(job.getSinkConnectorDesc());
sink.setConfig(job.getSinkDataSource());
EventMeshDataSource createdSink = dataSourceBizService.createDataSource(source);
sink.setConfig(job.getSinkDataSource().getConf());
EventMeshDataSource createdSink = dataSourceBizService.createDataSource(sink);
entity.setTargetData(createdSink.getId());

entityList.add(entity);
Expand Down Expand Up @@ -167,7 +177,7 @@ public JobDetail getJobDetail(String jobID) {
detail.setSinkConnectorDesc(target.getDescription());
}

TaskState state = TaskState.fromIndex(job.getState());
TaskState state = TaskState.fromIndex(job.getJobState());
if (state == null) {
throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal job state in db");
}
Expand Down
Loading
Loading