diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java index b79ac5ae82..12afb3a3d4 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java @@ -19,6 +19,7 @@ import org.apache.eventmesh.admin.server.web.service.task.TaskBizService; import org.apache.eventmesh.common.remote.request.CreateTaskRequest; +import org.apache.eventmesh.common.remote.response.CreateTaskResponse; import org.apache.eventmesh.common.utils.JsonUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -37,8 +38,8 @@ public class HttpServer { @RequestMapping(value = "/createTask", method = RequestMethod.POST) public ResponseEntity createOrUpdateTask(@RequestBody CreateTaskRequest task) { - String uuid = taskService.createTask(task); - return ResponseEntity.ok(JsonUtils.toJSONString(Response.success(uuid))); + CreateTaskResponse createTaskResponse = taskService.createTask(task); + return ResponseEntity.ok(JsonUtils.toJSONString(Response.success(createTaskResponse))); } public boolean deleteTask(Long id) { diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java index 7089f9cf76..7bc16ba4ac 100644 --- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java +++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/task/TaskBizService.java @@ -18,6 +18,8 @@ package org.apache.eventmesh.admin.server.web.service.task; import org.apache.eventmesh.admin.server.AdminServerProperties; +import org.apache.eventmesh.admin.server.web.Response; +import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; import org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo; import org.apache.eventmesh.admin.server.web.db.service.EventMeshTaskInfoService; import org.apache.eventmesh.admin.server.web.pojo.JobDetail; @@ -27,10 +29,12 @@ import org.apache.eventmesh.common.remote.datasource.DataSource; import org.apache.eventmesh.common.remote.datasource.DataSourceType; import org.apache.eventmesh.common.remote.request.CreateTaskRequest; +import org.apache.eventmesh.common.remote.response.CreateTaskResponse; import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.commons.lang3.StringUtils; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; @@ -55,8 +59,18 @@ public class TaskBizService { @Autowired private AdminServerProperties properties; + private static final String TYPE = "type"; + + private static final String DESC = "desc"; + + private static final String CONF_CLAZZ = "confClazz"; + + private static final String CONF = "conf"; + + private static final String REGION = "region"; + @Transactional - public String createTask(CreateTaskRequest req) { + public CreateTaskResponse createTask(CreateTaskRequest req) { String taskID = req.getTaskId(); if (StringUtils.isEmpty(taskID)) { taskID = UUID.randomUUID().toString(); @@ -64,8 +78,9 @@ public String createTask(CreateTaskRequest req) { } String targetRegion = req.getTargetRegion(); + String remoteResponse = ""; // not from other admin && target not equals with self region - if (!req.isFlag() && !StringUtils.equals(properties.getRegion(), targetRegion)) { + if (!req.isFlag() && !properties.getRegion().equals(targetRegion)) { List adminServerList = properties.getAdminServerList().get(targetRegion); if (adminServerList == null || adminServerList.isEmpty()) { throw new RuntimeException("No admin server available for region: " + targetRegion); @@ -78,6 +93,7 @@ public String createTask(CreateTaskRequest req) { if (!response.getStatusCode().is2xxSuccessful()) { throw new RuntimeException("Failed to create task on admin server: " + targetUrl); } + remoteResponse = response.getBody(); } String finalTaskID = taskID; @@ -93,7 +109,7 @@ public String createTask(CreateTaskRequest req) { job.setUpdateUid(req.getUid()); return job; }).collect(Collectors.toList()); - jobInfoService.createJobs(jobs); + EventMeshTaskInfo taskInfo = new EventMeshTaskInfo(); taskInfo.setTaskID(finalTaskID); taskInfo.setTaskName(req.getTaskName()); @@ -102,8 +118,9 @@ public String createTask(CreateTaskRequest req) { taskInfo.setCreateUid(req.getUid()); taskInfo.setSourceRegion(req.getSourceRegion()); taskInfo.setTargetRegion(req.getTargetRegion()); + List eventMeshJobInfoList = jobInfoService.createJobs(jobs); taskInfoService.save(taskInfo); - return finalTaskID; + return buildCreateTaskResponse(finalTaskID, eventMeshJobInfoList, remoteResponse); } private JobDetail parse(CreateTaskRequest.JobDetail src) throws ClassNotFoundException { @@ -111,29 +128,48 @@ private JobDetail parse(CreateTaskRequest.JobDetail src) throws ClassNotFoundExc dst.setJobDesc(src.getJobDesc()); dst.setTransportType(src.getTransportType()); dst.setSourceConnectorDesc(src.getSourceConnectorDesc()); - Map sourceDataMap = src.getSourceDataSource(); - DataSource sourceDataSource = new DataSource(); - sourceDataSource.setType(DataSourceType.fromString(sourceDataMap.get("type").toString())); - sourceDataSource.setDesc((String) sourceDataMap.get("desc")); - sourceDataSource.setConfClazz((Class) Class.forName(sourceDataMap.get("confClazz").toString())); - sourceDataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(sourceDataMap.get("conf")), sourceDataSource.getConfClazz())); - sourceDataSource.setRegion((String) sourceDataMap.get("region")); - dst.setSourceDataSource(sourceDataSource); - + try { + dst.setSourceDataSource(mapToDataSource(src.getSourceDataSource())); + dst.setSinkDataSource(mapToDataSource(src.getSinkDataSource())); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Failed to map data source", e); + } dst.setSinkConnectorDesc(src.getSinkConnectorDesc()); - Map sinkDataMap = src.getSinkDataSource(); - DataSource sinkDataSource = new DataSource(); - sinkDataSource.setType(DataSourceType.fromString(sinkDataMap.get("type").toString())); - sinkDataSource.setDesc((String) sinkDataMap.get("desc")); - sinkDataSource.setConfClazz((Class) Class.forName(sinkDataMap.get("confClazz").toString())); - sinkDataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(sinkDataMap.get("conf")), sinkDataSource.getConfClazz())); - sinkDataSource.setRegion((String) sinkDataMap.get("region")); - dst.setSinkDataSource(sinkDataSource); - // full/increase/check dst.setJobType(src.getJobType()); dst.setFromRegion(src.getFromRegion()); dst.setRunningRegion(src.getRunningRegion()); return dst; } + + private DataSource mapToDataSource(Map dataMap) throws ClassNotFoundException { + DataSource dataSource = new DataSource(); + dataSource.setType(DataSourceType.fromString(dataMap.get(TYPE).toString())); + dataSource.setDesc((String) dataMap.get(DESC)); + dataSource.setConfClazz((Class) Class.forName(dataMap.get(CONF_CLAZZ).toString())); + dataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(dataMap.get(CONF)), dataSource.getConfClazz())); + dataSource.setRegion((String) dataMap.get(REGION)); + return dataSource; + } + + private CreateTaskResponse buildCreateTaskResponse(String taskId, List eventMeshJobInfoList, String remoteResponse) { + CreateTaskResponse createTaskResponse = new CreateTaskResponse(); + createTaskResponse.setTaskId(taskId); + List jobDetailList = new ArrayList<>(); + if (!eventMeshJobInfoList.isEmpty()) { + for (EventMeshJobInfo eventMeshJobInfo : eventMeshJobInfoList) { + CreateTaskRequest.JobDetail jobDetail = new CreateTaskRequest.JobDetail(); + jobDetail.setJobId(eventMeshJobInfo.getJobID()); + jobDetail.setRunningRegion(eventMeshJobInfo.getRunningRegion()); + jobDetailList.add(jobDetail); + } + } + if (!StringUtils.isEmpty(remoteResponse)) { + Response response = JsonUtils.parseObject(remoteResponse, Response.class); + CreateTaskResponse remoteCreateTaskResponse = JsonUtils.convertValue(response.getData(), CreateTaskResponse.class); + jobDetailList.addAll(remoteCreateTaskResponse.getJobIdList()); + } + createTaskResponse.setJobIdList(jobDetailList); + return createTaskResponse; + } } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java index c895b5c440..b09a3e10ed 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/CreateTaskRequest.java @@ -56,6 +56,8 @@ public class CreateTaskRequest { @Data public static class JobDetail { + private String jobId; + private String jobDesc; // full/increase/check diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/CreateTaskResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/CreateTaskResponse.java index a6f5628d6f..11678dfcf0 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/CreateTaskResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/CreateTaskResponse.java @@ -17,5 +17,17 @@ package org.apache.eventmesh.common.remote.response; +import org.apache.eventmesh.common.remote.request.CreateTaskRequest; + +import java.util.List; + +import lombok.Data; + +@Data public class CreateTaskResponse extends BaseRemoteResponse { + + private String taskId; + + private List jobIdList; + } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java index bf91957032..9e9cea304d 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java @@ -54,6 +54,10 @@ public class JsonUtils { OBJECT_MAPPER.registerModule(new JavaTimeModule()); } + public static T convertValue(Object fromValue, Class toValueType) { + return OBJECT_MAPPER.convertValue(fromValue, toValueType); + } + public static T mapToObject(Map map, Class beanClass) { if (map == null) { return null;