diff --git a/openjob-common/pom.xml b/openjob-common/pom.xml index f1ac8062..a97d835f 100644 --- a/openjob-common/pom.xml +++ b/openjob-common/pom.xml @@ -5,7 +5,7 @@ openjob io.openjob - 1.0.7 + 1.0.8 4.0.0 openjob-common diff --git a/openjob-common/src/test/java/io/openjob/common/util/DateUtilTest.java b/openjob-common/src/test/java/io/openjob/common/util/DateUtilTest.java index db8a65e9..0577a8d5 100644 --- a/openjob-common/src/test/java/io/openjob/common/util/DateUtilTest.java +++ b/openjob-common/src/test/java/io/openjob/common/util/DateUtilTest.java @@ -15,7 +15,7 @@ public class DateUtilTest { @Test public void testFormatDateByTimestamp() { - Integer dateTime = DateUtil.formatDateByTimestamp(DateUtil.timestamp()); + Integer dateTime = DateUtil.formatDateByTimestamp(DateUtil.timestamp()* 1000); Integer HourTime = DateUtil.formatHourByTimestamp(DateUtil.timestamp()); Assertions.assertNotNull(dateTime); diff --git a/openjob-server/openjob-server-admin/pom.xml b/openjob-server/openjob-server-admin/pom.xml index 7c5c3948..d5ec4d48 100644 --- a/openjob-server/openjob-server-admin/pom.xml +++ b/openjob-server/openjob-server-admin/pom.xml @@ -5,7 +5,7 @@ openjob-server io.openjob - 1.0.7 + 1.0.8 4.0.0 openjob-server-admin diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/job/DeleteJobRequest.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/job/DeleteJobRequest.java index 4899499d..99d259f8 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/job/DeleteJobRequest.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/job/DeleteJobRequest.java @@ -6,7 +6,7 @@ import javax.validation.constraints.NotNull; /** - * @author zhenghongyang + * @author zhenghongyang sakuraovq@gmail.com * @since 1.0.0 */ @Data diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/job/ListJobRequest.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/job/ListJobRequest.java index 0e226428..09072447 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/job/ListJobRequest.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/job/ListJobRequest.java @@ -9,7 +9,7 @@ import javax.validation.constraints.NotNull; /** - * @author zhenghongyang + * @author zhenghongyang sakuraovq@gmail.com * @since 1.0.0 */ @Data diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/job/UpdateJobStatusRequest.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/job/UpdateJobStatusRequest.java index 55a6f61e..ce95d446 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/job/UpdateJobStatusRequest.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/request/job/UpdateJobStatusRequest.java @@ -6,7 +6,7 @@ import javax.validation.constraints.NotNull; /** - * @author zhenghongyang + * @author zhenghongyang sakuraovq@gmail.com * @since 1.0.0 */ @Data diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/JobService.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/JobService.java index d387825f..8963abf0 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/JobService.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/JobService.java @@ -17,7 +17,7 @@ import io.openjob.server.common.vo.PageVO; /** - * @author zhenghongyang + * @author zhenghongyang sakuraovq@gmail.com * @since 1.0.0 */ public interface JobService { diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/AppServiceImpl.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/AppServiceImpl.java index de4b6031..b476a185 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/AppServiceImpl.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/AppServiceImpl.java @@ -34,7 +34,7 @@ import java.util.stream.Collectors; /** - * @author zhenghongyang + * @author zhenghongyang sakuraovq@gmail.com * @since 1.0.0 */ @Service diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobServiceImpl.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobServiceImpl.java index 08ceac41..7bdd1089 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobServiceImpl.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobServiceImpl.java @@ -66,7 +66,7 @@ import java.util.stream.Collectors; /** - * @author zhenghongyang + * @author zhenghongyang sakuraovq@gmail.com * @since 1.0.0 */ @Slf4j diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/vo/job/ListJobVO.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/vo/job/ListJobVO.java index 98cd52c2..65845ea6 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/vo/job/ListJobVO.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/vo/job/ListJobVO.java @@ -4,7 +4,7 @@ import lombok.Data; /** - * @author zhenghongyang + * @author zhenghongyang sakuraovq@gmail.com * @since 1.0.0 */ @Data diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/vo/job/UpdateJobStatusVO.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/vo/job/UpdateJobStatusVO.java index f8438861..4f16a97f 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/vo/job/UpdateJobStatusVO.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/vo/job/UpdateJobStatusVO.java @@ -3,7 +3,7 @@ import lombok.Data; /** - * @author zhenghongyang + * @author zhenghongyang sakuraovq@gmail.com * @since 1.0.0 */ @Data diff --git a/openjob-server/openjob-server-alarm/pom.xml b/openjob-server/openjob-server-alarm/pom.xml index 47e0fe99..356dd637 100644 --- a/openjob-server/openjob-server-alarm/pom.xml +++ b/openjob-server/openjob-server-alarm/pom.xml @@ -5,7 +5,7 @@ openjob-server io.openjob - 1.0.7 + 1.0.8 4.0.0 openjob-server-alarm diff --git a/openjob-server/openjob-server-cluster/pom.xml b/openjob-server/openjob-server-cluster/pom.xml index 0d79e4b1..7cdeb828 100644 --- a/openjob-server/openjob-server-cluster/pom.xml +++ b/openjob-server/openjob-server-cluster/pom.xml @@ -5,7 +5,7 @@ openjob-server io.openjob - 1.0.7 + 1.0.8 4.0.0 diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/ClusterServer.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/ClusterServer.java index 6fe434c8..f845c852 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/ClusterServer.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/ClusterServer.java @@ -8,7 +8,7 @@ import akka.routing.RoundRobinPool; import com.typesafe.config.Config; import io.openjob.common.constant.AkkaConstant; -import io.openjob.server.cluster.manager.FailManager; +import io.openjob.server.cluster.common.FailCommon; import io.openjob.server.cluster.service.ClusterService; import io.openjob.server.cluster.service.JoinService; import io.openjob.server.common.ClusterContext; @@ -29,16 +29,16 @@ public class ClusterServer { private final ActorSystem actorSystem; private final SchedulerProperties schedulerProperties; private final JoinService joinService; - private final FailManager failManager; + private final FailCommon failCommon; private final ClusterService clusterService; @Autowired - public ClusterServer(ActorSystem actorSystem, SchedulerProperties schedulerProperties, JoinService joinService, FailManager failManager, ClusterService clusterService) { + public ClusterServer(ActorSystem actorSystem, SchedulerProperties schedulerProperties, JoinService joinService, FailCommon failCommon, ClusterService clusterService) { this.actorSystem = actorSystem; this.schedulerProperties = schedulerProperties; this.joinService = joinService; - this.failManager = failManager; + this.failCommon = failCommon; this.clusterService = clusterService; } @@ -148,7 +148,7 @@ private void registerCoordinatedShutdown() { .addTask(CoordinatedShutdown.PhaseServiceUnbind(), "coordinated-shutdown-hook", () -> { - this.failManager.shutdown(ClusterContext.getCurrentNode()); + this.failCommon.shutdown(ClusterContext.getCurrentNode()); return Future.successful(Done.done()); }); } diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/FailManager.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/common/FailCommon.java similarity index 96% rename from openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/FailManager.java rename to openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/common/FailCommon.java index 4dd0ba02..6b1be1c0 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/FailManager.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/common/FailCommon.java @@ -1,4 +1,4 @@ -package io.openjob.server.cluster.manager; +package io.openjob.server.cluster.common; import com.google.common.collect.Maps; import io.openjob.common.OpenjobSpringContext; @@ -33,7 +33,7 @@ */ @Slf4j @Component -public class FailManager { +public class FailCommon { private final ServerDAO serverDAO; private final JobSlotsDAO jobSlotsDAO; private final ClusterProperties clusterProperties; @@ -41,7 +41,7 @@ public class FailManager { private final Scheduler scheduler; @Autowired - public FailManager(ServerDAO serverDAO, JobSlotsDAO jobSlotsDAO, ClusterProperties clusterProperties, RefreshData refreshData, Scheduler scheduler) { + public FailCommon(ServerDAO serverDAO, JobSlotsDAO jobSlotsDAO, ClusterProperties clusterProperties, RefreshData refreshData, Scheduler scheduler) { this.serverDAO = serverDAO; this.jobSlotsDAO = jobSlotsDAO; this.clusterProperties = clusterProperties; diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/JoinManager.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/common/JoinCommon.java similarity index 96% rename from openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/JoinManager.java rename to openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/common/JoinCommon.java index 6d829816..6212f300 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/JoinManager.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/common/JoinCommon.java @@ -1,4 +1,4 @@ -package io.openjob.server.cluster.manager; +package io.openjob.server.cluster.common; import com.google.common.collect.Maps; import io.openjob.common.OpenjobSpringContext; @@ -29,7 +29,7 @@ */ @Slf4j @Component -public class JoinManager { +public class JoinCommon { private final ServerDAO serverDAO; private final JobSlotsDAO jobSlotsDAO; private final RefreshData refreshData; @@ -39,7 +39,7 @@ public class JoinManager { * Refresh status. */ @Autowired - public JoinManager(ServerDAO serverDAO, JobSlotsDAO jobSlotsDAO, RefreshData refreshData, ClusterProperties clusterProperties) { + public JoinCommon(ServerDAO serverDAO, JobSlotsDAO jobSlotsDAO, RefreshData refreshData, ClusterProperties clusterProperties) { this.serverDAO = serverDAO; this.jobSlotsDAO = jobSlotsDAO; this.refreshData = refreshData; diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerHeartbeatReqDTO.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerHeartbeatReqDTO.java new file mode 100644 index 00000000..a2ba9d1c --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerHeartbeatReqDTO.java @@ -0,0 +1,25 @@ +package io.openjob.server.cluster.dto; + +import lombok.Data; + +import java.util.List; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class WorkerHeartbeatReqDTO { + private Long appId; + + private String appName; + + private String address; + + private String version; + + /** + * Running job instance ids. + */ + private List runningJobInstanceIds; +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerHeartbeatRespDTO.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerHeartbeatRespDTO.java new file mode 100644 index 00000000..dc5d854d --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerHeartbeatRespDTO.java @@ -0,0 +1,28 @@ +package io.openjob.server.cluster.dto; + +import lombok.Data; + +import java.util.Set; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class WorkerHeartbeatRespDTO { + + /** + * Worker address list. + */ + private Set workerAddressList; + + /** + * Cluster version + */ + private Long clusterVersion; + + /** + * Cluster delay version + */ + private Long clusterDelayVersion; +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerJobInstanceStatusReqDTO.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerJobInstanceStatusReqDTO.java new file mode 100644 index 00000000..7ae79b1f --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerJobInstanceStatusReqDTO.java @@ -0,0 +1,47 @@ +package io.openjob.server.cluster.dto; + +import io.openjob.common.constant.FailStatusEnum; +import io.openjob.common.constant.InstanceStatusEnum; +import lombok.Data; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class WorkerJobInstanceStatusReqDTO { + /** + * Job id. + */ + private Long jobId; + + /** + * Job instance id. + */ + private Long jobInstanceId; + + /** + * Current circleId. + * Only for second delay task. + */ + private Long circleId; + + /** + * Job instance status. + * + * @see InstanceStatusEnum + */ + private Integer status; + + /** + * Fail status + * + * @see FailStatusEnum#getStatus() + */ + private Integer failStatus; + + /** + * Result + */ + private String result; +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerJobInstanceStatusRespDTO.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerJobInstanceStatusRespDTO.java new file mode 100644 index 00000000..c1b5e58d --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerJobInstanceStatusRespDTO.java @@ -0,0 +1,11 @@ +package io.openjob.server.cluster.dto; + +import lombok.Data; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class WorkerJobInstanceStatusRespDTO { +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerJobInstanceTaskBatchReqDTO.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerJobInstanceTaskBatchReqDTO.java new file mode 100644 index 00000000..da386134 --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerJobInstanceTaskBatchReqDTO.java @@ -0,0 +1,80 @@ +package io.openjob.server.cluster.dto; + +import lombok.Data; + +import java.util.List; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class WorkerJobInstanceTaskBatchReqDTO { + private List taskRequestList; + + @Data + public static class WorkerJobInstanceTaskReqDTO { + /** + * Job id. + */ + private Long jobId; + + /** + * Job instance id. + */ + private Long jobInstanceId; + + /** + * Dispatch version + */ + private Long dispatchVersion; + + /** + * Only for second delay task. + */ + private Long circleId; + + /** + * Task unique id. + * like 'jobId_instanceId_taskId_circleId' + */ + private String taskId; + + /** + * Task parent unique id. + * like 'jobId_instanceId_taskId_circleId' + */ + private String parentTaskId; + + /** + * Task name. + * Only for map reduce task. + */ + private String taskName; + + /** + * Task status. + */ + private Integer status; + + /** + * Task result. + */ + private String result; + + /** + * Task worker address + */ + private String workerAddress; + + /** + * Task create time. + */ + private Long createTime; + + /** + * Task update time. + */ + private Long updateTime; + } +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerJobInstanceTaskBatchRespDTO.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerJobInstanceTaskBatchRespDTO.java new file mode 100644 index 00000000..9ddc161c --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerJobInstanceTaskBatchRespDTO.java @@ -0,0 +1,11 @@ +package io.openjob.server.cluster.dto; + +import lombok.Data; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class WorkerJobInstanceTaskBatchRespDTO { +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerJobInstanceTaskLogReqDTO.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerJobInstanceTaskLogReqDTO.java new file mode 100644 index 00000000..05769919 --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerJobInstanceTaskLogReqDTO.java @@ -0,0 +1,20 @@ +package io.openjob.server.cluster.dto; + +import lombok.Data; + +import java.util.List; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class WorkerJobInstanceTaskLogReqDTO { + private List> fieldList; + + @Data + public static class WorkerJobInstanceTaskLogFieldReqDTO { + private String name; + private String value; + } +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerJobInstanceTaskLogRespDTO.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerJobInstanceTaskLogRespDTO.java new file mode 100644 index 00000000..ba8c49de --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerJobInstanceTaskLogRespDTO.java @@ -0,0 +1,11 @@ +package io.openjob.server.cluster.dto; + +import lombok.Data; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class WorkerJobInstanceTaskLogRespDTO { +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerStartReqDTO.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerStartReqDTO.java new file mode 100644 index 00000000..4b4e748b --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerStartReqDTO.java @@ -0,0 +1,20 @@ +package io.openjob.server.cluster.dto; + +import lombok.Data; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class WorkerStartReqDTO { + private String workerKey; + + private String appName; + + private String address; + + private String version; + + private String protocolType; +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerStartRespDTO.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerStartRespDTO.java new file mode 100644 index 00000000..902ed22a --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerStartRespDTO.java @@ -0,0 +1,16 @@ +package io.openjob.server.cluster.dto; + +import lombok.Data; + +import java.util.Set; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class WorkerStartRespDTO { + private Long appId; + private String appName; + private Set workerAddressList; +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerStopReqDTO.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerStopReqDTO.java new file mode 100644 index 00000000..3a93556e --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerStopReqDTO.java @@ -0,0 +1,14 @@ +package io.openjob.server.cluster.dto; + +import lombok.Data; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class WorkerStopReqDTO { + private String workerKey; + private String appName; + private String address; +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerStopRespDTO.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerStopRespDTO.java new file mode 100644 index 00000000..d2d53905 --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/dto/WorkerStopRespDTO.java @@ -0,0 +1,11 @@ +package io.openjob.server.cluster.dto; + +import lombok.Data; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class WorkerStopRespDTO { +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerHeartbeatExecutor.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerHeartbeatExecutor.java index bc610495..7ae4583f 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerHeartbeatExecutor.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerHeartbeatExecutor.java @@ -1,7 +1,7 @@ package io.openjob.server.cluster.executor; -import io.openjob.common.request.WorkerHeartbeatRequest; import io.openjob.common.task.TaskQueue; +import io.openjob.server.cluster.dto.WorkerHeartbeatReqDTO; import io.openjob.server.cluster.task.WorkerHeartConsumer; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -13,7 +13,7 @@ @Slf4j @Component public class WorkerHeartbeatExecutor { - private final TaskQueue queue; + private final TaskQueue queue; /** * New @@ -39,7 +39,7 @@ public WorkerHeartbeatExecutor() { * * @param request request */ - public void submit(WorkerHeartbeatRequest request) { + public void submit(WorkerHeartbeatReqDTO request) { try { this.queue.submit(request); } catch (InterruptedException e) { diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerJobInstanceExecutor.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerJobInstanceExecutor.java index 5da56614..7abf2d00 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerJobInstanceExecutor.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerJobInstanceExecutor.java @@ -1,7 +1,7 @@ package io.openjob.server.cluster.executor; -import io.openjob.common.request.WorkerJobInstanceStatusRequest; import io.openjob.common.task.TaskQueue; +import io.openjob.server.cluster.dto.WorkerJobInstanceStatusReqDTO; import io.openjob.server.cluster.task.WorkerJobInstanceConsumer; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -13,7 +13,7 @@ @Slf4j @Component public class WorkerJobInstanceExecutor { - private final TaskQueue queue; + private final TaskQueue queue; /** * New @@ -39,7 +39,7 @@ public WorkerJobInstanceExecutor() { * * @param request request */ - public void submit(WorkerJobInstanceStatusRequest request) { + public void submit(WorkerJobInstanceStatusReqDTO request) { try { this.queue.submit(request); } catch (InterruptedException e) { diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerJobInstanceTaskExecutor.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerJobInstanceTaskExecutor.java index 240710e5..b6c799a9 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerJobInstanceTaskExecutor.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerJobInstanceTaskExecutor.java @@ -1,7 +1,7 @@ package io.openjob.server.cluster.executor; -import io.openjob.common.request.WorkerJobInstanceTaskBatchRequest; import io.openjob.common.task.TaskQueue; +import io.openjob.server.cluster.dto.WorkerJobInstanceTaskBatchReqDTO; import io.openjob.server.cluster.task.WorkerJobInstanceTaskConsumer; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -13,7 +13,7 @@ @Slf4j @Component public class WorkerJobInstanceTaskExecutor { - private final TaskQueue queue; + private final TaskQueue queue; /** * New @@ -39,7 +39,7 @@ public WorkerJobInstanceTaskExecutor() { * * @param request request */ - public void submit(WorkerJobInstanceTaskBatchRequest request) { + public void submit(WorkerJobInstanceTaskBatchReqDTO request) { try { this.queue.submit(request); } catch (InterruptedException e) { diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerTaskLogExecutor.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerTaskLogExecutor.java index c25a7c99..2168f42e 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerTaskLogExecutor.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerTaskLogExecutor.java @@ -1,7 +1,7 @@ package io.openjob.server.cluster.executor; -import io.openjob.common.request.WorkerJobInstanceTaskLogRequest; import io.openjob.common.task.TaskQueue; +import io.openjob.server.cluster.dto.WorkerJobInstanceTaskLogReqDTO; import io.openjob.server.cluster.task.WorkerTaskLogConsumer; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -13,7 +13,7 @@ @Slf4j @Component public class WorkerTaskLogExecutor { - private final TaskQueue queue; + private final TaskQueue queue; /** * New @@ -39,7 +39,7 @@ public WorkerTaskLogExecutor() { * * @param request request */ - public void submit(WorkerJobInstanceTaskLogRequest request) { + public void submit(WorkerJobInstanceTaskLogReqDTO request) { try { this.queue.submit(request); } catch (InterruptedException e) { diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/JobInstanceManager.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/JobInstanceManager.java new file mode 100644 index 00000000..426f92f6 --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/JobInstanceManager.java @@ -0,0 +1,124 @@ +package io.openjob.server.cluster.manager; + +import io.openjob.common.constant.CommonConstant; +import io.openjob.common.constant.FailStatusEnum; +import io.openjob.common.constant.InstanceStatusEnum; +import io.openjob.server.alarm.constant.AlarmEventEnum; +import io.openjob.server.alarm.dto.AlarmEventDTO; +import io.openjob.server.alarm.event.AlarmEvent; +import io.openjob.server.alarm.event.AlarmEventPublisher; +import io.openjob.server.cluster.dto.WorkerJobInstanceStatusReqDTO; +import io.openjob.server.cluster.dto.WorkerJobInstanceStatusRespDTO; +import io.openjob.server.cluster.dto.WorkerJobInstanceTaskBatchReqDTO; +import io.openjob.server.cluster.dto.WorkerJobInstanceTaskBatchRespDTO; +import io.openjob.server.cluster.executor.WorkerJobInstanceExecutor; +import io.openjob.server.cluster.executor.WorkerJobInstanceTaskExecutor; +import io.openjob.server.repository.dao.JobInstanceDAO; +import io.openjob.server.repository.dao.JobInstanceTaskDAO; +import io.openjob.server.repository.entity.JobInstanceTask; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DataIntegrityViolationException; +import org.springframework.stereotype.Component; +import org.springframework.transaction.UnexpectedRollbackException; +import org.springframework.transaction.annotation.Transactional; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Slf4j +@Component +public class JobInstanceManager { + private final JobInstanceTaskDAO jobInstanceTaskDAO; + private final JobInstanceDAO jobInstanceDAO; + private final WorkerJobInstanceExecutor workerJobInstanceExecutor; + private final WorkerJobInstanceTaskExecutor workerJobInstanceTaskExecutor; + + @Autowired + public JobInstanceManager(JobInstanceTaskDAO jobInstanceTaskDAO, + JobInstanceDAO jobInstanceDAO, + WorkerJobInstanceExecutor workerJobInstanceExecutor, + WorkerJobInstanceTaskExecutor workerJobInstanceTaskExecutor) { + this.jobInstanceTaskDAO = jobInstanceTaskDAO; + this.jobInstanceDAO = jobInstanceDAO; + this.workerJobInstanceExecutor = workerJobInstanceExecutor; + this.workerJobInstanceTaskExecutor = workerJobInstanceTaskExecutor; + } + + public WorkerJobInstanceStatusRespDTO handleInstanceStatus(WorkerJobInstanceStatusReqDTO statusRequest) { + this.workerJobInstanceExecutor.submit(statusRequest); + return new WorkerJobInstanceStatusRespDTO(); + } + + @Transactional(rollbackFor = Exception.class) + public void handleConsumerInstanceStatus(List statusList) { + // Update status + statusList.forEach(s -> { + this.jobInstanceDAO.updateStatusById(s.getJobInstanceId(), s.getStatus(), s.getFailStatus()); + this.addAlarmEvent(s); + }); + } + + public WorkerJobInstanceTaskBatchRespDTO handleInstanceTasks(WorkerJobInstanceTaskBatchReqDTO taskBatchRequest) { + this.workerJobInstanceTaskExecutor.submit(taskBatchRequest); + return new WorkerJobInstanceTaskBatchRespDTO(); + } + + /** + * Handle instance status. + * + * @param taskList task list + */ + @Transactional(rollbackFor = Exception.class) + public void handleConsumerInstanceTasks(List taskList) { + List saveList = new ArrayList<>(); + taskList.forEach(l -> l.getTaskRequestList().forEach(t -> { + JobInstanceTask jobInstanceTask = new JobInstanceTask(); + jobInstanceTask.setJobId(t.getJobId()); + jobInstanceTask.setJobInstanceId(t.getJobInstanceId()); + jobInstanceTask.setDispatchVersion(t.getDispatchVersion()); + jobInstanceTask.setCircleId(t.getCircleId()); + jobInstanceTask.setTaskId(t.getTaskId()); + jobInstanceTask.setParentTaskId(t.getParentTaskId()); + jobInstanceTask.setTaskName(t.getTaskName()); + jobInstanceTask.setStatus(t.getStatus()); + jobInstanceTask.setResult(t.getResult()); + jobInstanceTask.setWorkerAddress(t.getWorkerAddress()); + jobInstanceTask.setDeleted(CommonConstant.NO); + jobInstanceTask.setDeleteTime(0L); + jobInstanceTask.setCreateTime(t.getCreateTime()); + jobInstanceTask.setUpdateTime(t.getUpdateTime()); + saveList.add(jobInstanceTask); + })); + + try { + this.jobInstanceTaskDAO.batchSave(saveList); + } catch (DataIntegrityViolationException | UnexpectedRollbackException exception) { + log.warn("Data has been saved! {}", taskList); + } + } + + protected void addAlarmEvent(WorkerJobInstanceStatusReqDTO statusRequest) { + if (InstanceStatusEnum.isFailed(statusRequest.getStatus())) { + AlarmEventDTO alarmEventDTO = new AlarmEventDTO(); + alarmEventDTO.setJobUniqueId(String.valueOf(statusRequest.getJobId())); + alarmEventDTO.setInstanceId(String.valueOf(statusRequest.getJobInstanceId())); + + // Fail status + if (FailStatusEnum.isExecuteTimeout(statusRequest.getFailStatus())) { + alarmEventDTO.setName(AlarmEventEnum.JOB_EXECUTE_TIMEOUT.getEvent()); + } else { + alarmEventDTO.setName(AlarmEventEnum.JOB_EXECUTE_FAIL.getEvent()); + } + + // Event message + alarmEventDTO.setMessage(Optional.ofNullable(statusRequest.getResult()).orElse("")); + AlarmEventPublisher.publishEvent(new AlarmEvent(alarmEventDTO)); + } + } +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/JobInstanceTaskLogManager.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/JobInstanceTaskLogManager.java new file mode 100644 index 00000000..b7f63406 --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/JobInstanceTaskLogManager.java @@ -0,0 +1,89 @@ +package io.openjob.server.cluster.manager; + +import com.google.common.collect.Lists; +import io.openjob.common.constant.LogFieldConstant; +import io.openjob.server.cluster.dto.WorkerJobInstanceTaskLogReqDTO; +import io.openjob.server.cluster.dto.WorkerJobInstanceTaskLogRespDTO; +import io.openjob.server.cluster.executor.WorkerTaskLogExecutor; +import io.openjob.server.common.util.BeanMapperUtil; +import io.openjob.server.log.dao.LogDAO; +import io.openjob.server.log.dto.ProcessorLogDTO; +import io.openjob.server.log.dto.ProcessorLogFieldDTO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Slf4j +@Component +public class JobInstanceTaskLogManager { + private final LogDAO logDAO; + private final WorkerTaskLogExecutor executor; + + @Autowired + public JobInstanceTaskLogManager(LogDAO logDAO, WorkerTaskLogExecutor executor) { + this.logDAO = logDAO; + this.executor = executor; + } + + /** + * Handle instance log. + * + * @param logReq log request. + */ + public WorkerJobInstanceTaskLogRespDTO handleInstanceTaskLog(WorkerJobInstanceTaskLogReqDTO logReq) { + this.executor.submit(logReq); + return new WorkerJobInstanceTaskLogRespDTO(); + } + + /** + * Batch instance log + * + * @param requests requests + */ + public void batchInstanceTaskLog(List requests) { + List processorLogList = requests.stream().flatMap(r -> r.getFieldList().stream().map(fields -> { + // Field map. + Map> fieldMap = fields.stream() + .collect(Collectors.groupingBy(WorkerJobInstanceTaskLogReqDTO.WorkerJobInstanceTaskLogFieldReqDTO::getName)); + + // Task id. + String taskId = Optional.ofNullable(fieldMap.get(LogFieldConstant.TASK_ID)) + .orElseGet(() -> Lists.newArrayList(new WorkerJobInstanceTaskLogReqDTO.WorkerJobInstanceTaskLogFieldReqDTO())) + .get(0).getValue(); + + // Worker address. + String workerAddress = Optional.ofNullable(fieldMap.get(LogFieldConstant.WORKER_ADDRESS)) + .orElseGet(() -> Lists.newArrayList(new WorkerJobInstanceTaskLogReqDTO.WorkerJobInstanceTaskLogFieldReqDTO())) + .get(0).getValue(); + + // Time + String timeStamp = Optional.ofNullable(fieldMap.get(LogFieldConstant.TIME_STAMP)) + .orElseGet(() -> Lists.newArrayList(new WorkerJobInstanceTaskLogReqDTO.WorkerJobInstanceTaskLogFieldReqDTO())) + .get(0).getValue(); + + ProcessorLogDTO processorLog = new ProcessorLogDTO(); + processorLog.setTaskId(taskId); + processorLog.setWorkerAddress(workerAddress); + processorLog.setTime(Long.valueOf(timeStamp)); + processorLog.setFields(BeanMapperUtil.mapList(fields, WorkerJobInstanceTaskLogReqDTO.WorkerJobInstanceTaskLogFieldReqDTO.class, ProcessorLogFieldDTO.class)); + return processorLog; + })).collect(Collectors.toList()); + + try { + logDAO.batchAdd(processorLogList); + } catch (Exception e) { + log.error("Batch add task log failed!", e); + throw new RuntimeException(e); + } + + } +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/WorkerHeartbeatManager.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/WorkerHeartbeatManager.java new file mode 100644 index 00000000..ff964cd1 --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/WorkerHeartbeatManager.java @@ -0,0 +1,98 @@ +package io.openjob.server.cluster.manager; + +import io.openjob.common.request.WorkerHeartbeatRequest; +import io.openjob.common.response.ServerHeartbeatResponse; +import io.openjob.common.response.ServerHeartbeatSystemResponse; +import io.openjob.common.util.DateUtil; +import io.openjob.server.cluster.dto.WorkerHeartbeatReqDTO; +import io.openjob.server.cluster.dto.WorkerHeartbeatRespDTO; +import io.openjob.server.cluster.executor.WorkerHeartbeatExecutor; +import io.openjob.server.cluster.util.ClusterUtil; +import io.openjob.server.common.ClusterContext; +import io.openjob.server.common.dto.SystemDTO; +import io.openjob.server.repository.dao.JobInstanceDAO; +import io.openjob.server.repository.dao.WorkerDAO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.CollectionUtils; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Slf4j +@Component +public class WorkerHeartbeatManager { + private final WorkerDAO workerDAO; + private final JobInstanceDAO jobInstanceDAO; + private final WorkerHeartbeatExecutor workerHeartbeatExecutor; + + @Autowired + public WorkerHeartbeatManager(WorkerDAO workerDAO, + JobInstanceDAO jobInstanceDAO, + WorkerHeartbeatExecutor workerHeartbeatExecutor) { + this.workerDAO = workerDAO; + this.jobInstanceDAO = jobInstanceDAO; + this.workerHeartbeatExecutor = workerHeartbeatExecutor; + } + + /** + * Worker heartbeat + * + * @param heartbeatReq heartbeat request. + */ + public WorkerHeartbeatRespDTO workerHeartbeat(WorkerHeartbeatReqDTO heartbeatReq) { + //Submit request + this.workerHeartbeatExecutor.submit(heartbeatReq); + + + // System information. + SystemDTO system = ClusterContext.getSystem(); + + // Online workers and exclude start worker. + Set onlineWorkers = ClusterUtil.getOnlineWorkers(heartbeatReq.getAppId()); + + WorkerHeartbeatRespDTO respDTO = new WorkerHeartbeatRespDTO(); + respDTO.setClusterVersion(system.getClusterVersion()); + respDTO.setClusterDelayVersion(system.getClusterDelayVersion()); + respDTO.setWorkerAddressList(onlineWorkers); + return respDTO; + } + + /** + * Batch heartbeat + * + * @param requests requests + */ + @Transactional(rollbackFor = Exception.class) + public void batchHeartbeat(List requests) { + Long timestamp = DateUtil.timestamp(); + Set addressList = new HashSet<>(); + Set instanceIds = new HashSet<>(); + + // Merge requests + requests.forEach(r -> { + addressList.add(r.getAddress()); + if (!CollectionUtils.isEmpty(r.getRunningJobInstanceIds())) { + instanceIds.addAll(r.getRunningJobInstanceIds()); + } + }); + + // Worker heartbeat + if (!CollectionUtils.isEmpty(addressList)) { + this.workerDAO.updateLastHeartbeatTimeByAddresses(new ArrayList<>(addressList), timestamp); + } + + // Instance last report time. + if (!CollectionUtils.isEmpty(instanceIds)) { + this.jobInstanceDAO.updateLastReportTimeByIds(new ArrayList<>(instanceIds), timestamp); + } + } +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/WorkerManager.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/WorkerManager.java new file mode 100644 index 00000000..25b3c2fb --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/WorkerManager.java @@ -0,0 +1,208 @@ +package io.openjob.server.cluster.manager; + +import io.openjob.common.OpenjobSpringContext; +import io.openjob.common.constant.CommonConstant; +import io.openjob.common.util.DateUtil; +import io.openjob.server.cluster.autoconfigure.ClusterProperties; +import io.openjob.server.cluster.data.RefreshData; +import io.openjob.server.cluster.dto.WorkerFailDTO; +import io.openjob.server.cluster.dto.WorkerJoinDTO; +import io.openjob.server.cluster.dto.WorkerStartReqDTO; +import io.openjob.server.cluster.dto.WorkerStartRespDTO; +import io.openjob.server.cluster.dto.WorkerStopReqDTO; +import io.openjob.server.cluster.dto.WorkerStopRespDTO; +import io.openjob.server.cluster.util.ClusterUtil; +import io.openjob.server.common.ClusterContext; +import io.openjob.server.common.util.SlotsUtil; +import io.openjob.server.repository.constant.WorkerStatusEnum; +import io.openjob.server.repository.dao.AppDAO; +import io.openjob.server.repository.dao.WorkerDAO; +import io.openjob.server.repository.entity.App; +import io.openjob.server.repository.entity.Worker; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Slf4j +@Component +public class WorkerManager { + private final WorkerDAO workerDAO; + private final AppDAO appDAO; + private final ClusterProperties clusterProperties; + private final RefreshData refreshManager; + + @Autowired + public WorkerManager(WorkerDAO workerDAO, AppDAO appDAO, ClusterProperties clusterProperties, RefreshData refreshManager) { + this.workerDAO = workerDAO; + this.appDAO = appDAO; + this.clusterProperties = clusterProperties; + this.refreshManager = refreshManager; + } + + /** + * Worker start + * + * @param reqDTO start request. + */ + public WorkerStartRespDTO workerStart(WorkerStartReqDTO reqDTO) { + // Check app name. + App app = this.checkAppName(reqDTO.getAppName()); + + // Do worker start. + OpenjobSpringContext.getBean(this.getClass()).doWorkerStart(reqDTO, app); + + // Akka message for worker start. + WorkerJoinDTO workerJoinDTO = new WorkerJoinDTO(); + workerJoinDTO.setClusterVersion(ClusterContext.getSystem().getClusterVersion()); + ClusterUtil.sendMessage(workerJoinDTO, ClusterContext.getCurrentNode(), this.clusterProperties.getSpreadSize(), new HashSet<>()); + + // Online workers and exclude start worker. + Set onlineWorkers = ClusterUtil.getOnlineWorkers(app.getId()); + + // Response + WorkerStartRespDTO response = new WorkerStartRespDTO(); + response.setAppId(app.getId()); + response.setAppName(app.getName()); + response.setWorkerAddressList(onlineWorkers); + return response; + } + + /** + * Do worker start. + * + * @param reqDTO reqDTO + */ + @Transactional(rollbackFor = Exception.class) + public void doWorkerStart(WorkerStartReqDTO reqDTO, App app) { + // Refresh system. + // Lock system cluster version. + this.refreshManager.refreshSystem(true); + + // Update worker status. + this.updateWorkerForStart(reqDTO, app); + + // Refresh cluster context. + refreshClusterContext(); + } + + /** + * Worker stop + * + * @param stopReq stop request. + */ + public WorkerStopRespDTO workerStop(WorkerStopReqDTO stopReq) { + // Check app name. + this.checkAppName(stopReq.getAppName()); + + // Do worker stop. + OpenjobSpringContext.getBean(this.getClass()).doWorkerStop(stopReq); + + // Akka message for worker start. + WorkerFailDTO workerFailDTO = new WorkerFailDTO(); + workerFailDTO.setClusterVersion(ClusterContext.getSystem().getClusterVersion()); + ClusterUtil.sendMessage(workerFailDTO, ClusterContext.getCurrentNode(), this.clusterProperties.getSpreadSize(), new HashSet<>()); + return new WorkerStopRespDTO(); + } + + /** + * Do worker stop. + * + * @param stopReq stopReq + */ + @Transactional(rollbackFor = Exception.class) + public void doWorkerStop(WorkerStopReqDTO stopReq) { + // Refresh system. + // Lock system cluster version. + this.refreshManager.refreshSystem(true); + + Worker worker = workerDAO.getByAddress(stopReq.getAddress()); + if (Objects.isNull(worker)) { + log.error("worker({}) do not exist!", stopReq.getAddress()); + return; + } + + // Update worker + worker.setStatus(WorkerStatusEnum.OFFLINE.getStatus()); + worker.setUpdateTime(DateUtil.timestamp()); + workerDAO.save(worker); + + // Refresh cluster context. + this.refreshClusterContext(); + } + + private void refreshClusterContext() { + List workers = workerDAO.listOnlineWorkers(); + ClusterUtil.refreshAppWorkers(workers); + + log.info("Refresh app workers={}", workers.stream().map(Worker::getAddress).collect(Collectors.toList())); + } + + /** + * Check app name. + * + * @param appName app name. + * @return App + */ + private App checkAppName(String appName) { + App app = appDAO.getAppByName(appName); + if (Objects.isNull(app)) { + throw new RuntimeException(String.format("Register application(%s) do not exist!", appName)); + } + return app; + } + + private void updateWorkerForStart(WorkerStartReqDTO startReq, App app) { + // Update worker. + long now = DateUtil.timestamp(); + Worker worker = workerDAO.getByAddress(startReq.getAddress()); + if (Objects.nonNull(worker)) { + worker.setWorkerKey(worker.getWorkerKey()); + worker.setStatus(WorkerStatusEnum.ONLINE.getStatus()); + + //Must update last heartbeat time. + worker.setLastHeartbeatTime(now); + worker.setUpdateTime(now); + + // Update latest app name + worker.setAppName(startReq.getAppName()); + worker.setNamespaceId(app.getNamespaceId()); + worker.setAppId(app.getId()); + worker.setProtocolType(startReq.getProtocolType()); + workerDAO.save(worker); + return; + } + + // save + Worker saveWorker = new Worker(); + saveWorker.setUpdateTime(now); + saveWorker.setCreateTime(now); + saveWorker.setWorkerKey(startReq.getWorkerKey()); + saveWorker.setAddress(startReq.getAddress()); + saveWorker.setSlotsId(SlotsUtil.getWorkerSupervisorSlotsId(UUID.randomUUID().toString())); + saveWorker.setStatus(WorkerStatusEnum.ONLINE.getStatus()); + saveWorker.setAppId(app.getId()); + saveWorker.setAppName(startReq.getAppName()); + saveWorker.setLastHeartbeatTime(now); + saveWorker.setNamespaceId(app.getNamespaceId()); + saveWorker.setProtocolType(startReq.getProtocolType()); + saveWorker.setVersion(startReq.getVersion()); + saveWorker.setMetric(""); + saveWorker.setVersion(""); + saveWorker.setWorkerKey(""); + saveWorker.setDeleteTime(0L); + saveWorker.setDeleted(CommonConstant.NO); + workerDAO.save(saveWorker); + } +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/ClusterService.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/ClusterService.java index 0cd553c7..a14cd0de 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/ClusterService.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/ClusterService.java @@ -8,7 +8,7 @@ import io.openjob.server.cluster.dto.NodeShutdownDTO; import io.openjob.server.cluster.dto.WorkerFailDTO; import io.openjob.server.cluster.dto.WorkerJoinDTO; -import io.openjob.server.cluster.manager.FailManager; +import io.openjob.server.cluster.common.FailCommon; import io.openjob.server.cluster.data.RefreshData; import io.openjob.server.cluster.util.ClusterUtil; import io.openjob.server.common.ClusterContext; @@ -32,7 +32,7 @@ public class ClusterService { private final RefreshData refreshData; private final Scheduler scheduler; private final ClusterProperties clusterProperties; - private final FailManager failManager; + private final FailCommon failCommon; /** * Refresh status. @@ -41,11 +41,11 @@ public class ClusterService { private final AtomicBoolean nodeRunning = new AtomicBoolean(false); @Autowired - public ClusterService(RefreshData refreshData, Scheduler scheduler, ClusterProperties clusterProperties, FailManager failManager) { + public ClusterService(RefreshData refreshData, Scheduler scheduler, ClusterProperties clusterProperties, FailCommon failCommon) { this.refreshData = refreshData; this.scheduler = scheduler; this.clusterProperties = clusterProperties; - this.failManager = failManager; + this.failCommon = failCommon; } /** @@ -161,7 +161,7 @@ public void receiveNodeShutdown(NodeShutdownDTO shutdown) { stopNode.setAkkaAddress(shutdown.getAkkaAddress()); stopNode.setIp(shutdown.getIp()); stopNode.setServerId(shutdown.getServerId()); - this.failManager.fail(stopNode); + this.failCommon.fail(stopNode); } /** diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/HealthService.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/HealthService.java index 36e44f03..696437e3 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/HealthService.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/HealthService.java @@ -10,8 +10,8 @@ import io.openjob.server.cluster.autoconfigure.ClusterProperties; import io.openjob.server.cluster.dto.NodePingDTO; import io.openjob.server.cluster.dto.NodePongDTO; -import io.openjob.server.cluster.manager.FailManager; -import io.openjob.server.cluster.manager.JoinManager; +import io.openjob.server.cluster.common.FailCommon; +import io.openjob.server.cluster.common.JoinCommon; import io.openjob.server.cluster.util.ClusterUtil; import io.openjob.server.common.ClusterContext; import io.openjob.server.common.constant.AkkaConfigConstant; @@ -38,21 +38,21 @@ @Log4j2 @Service public class HealthService { - private final FailManager failManager; + private final FailCommon failCommon; private final ServerReportsDAO serverReportsDAO; private final ClusterProperties clusterProperties; private final JobSlotsDAO jobSlotsDAO; - private final JoinManager joinManager; + private final JoinCommon joinCommon; private final ActorSystem actorSystem; @Autowired - public HealthService(ServerReportsDAO serverReportsDAO, FailManager failManager, ClusterProperties clusterProperties, JobSlotsDAO jobSlotsDAO, JoinManager joinManager, ActorSystem actorSystem) { + public HealthService(ServerReportsDAO serverReportsDAO, FailCommon failCommon, ClusterProperties clusterProperties, JobSlotsDAO jobSlotsDAO, JoinCommon joinCommon, ActorSystem actorSystem) { this.serverReportsDAO = serverReportsDAO; - this.failManager = failManager; + this.failCommon = failCommon; this.clusterProperties = clusterProperties; this.jobSlotsDAO = jobSlotsDAO; - this.joinManager = joinManager; + this.joinCommon = joinCommon; this.actorSystem = actorSystem; } @@ -123,7 +123,7 @@ public void checkFailReports(Long failServerId, Node failNode) { // Offline if (reportsCount > this.clusterProperties.getNodeFailTimes()) { - this.failManager.fail(failNode); + this.failCommon.fail(failNode); } } @@ -155,7 +155,7 @@ public void checkOnline(Node reportNode) { Config config = this.actorSystem.settings().config(); Integer bindPort = config.getInt(AkkaConfigConstant.AKKA_CANONICAL_PORT); String bindHostname = config.getString(AkkaConfigConstant.AKKA_CANONICAL_HOSTNAME); - this.joinManager.join(bindHostname, bindPort); + this.joinCommon.join(bindHostname, bindPort); } } } \ No newline at end of file diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceService.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceService.java index dc355135..0f0341e9 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceService.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceService.java @@ -1,29 +1,14 @@ package io.openjob.server.cluster.service; -import io.openjob.common.constant.CommonConstant; -import io.openjob.common.constant.FailStatusEnum; -import io.openjob.common.constant.InstanceStatusEnum; import io.openjob.common.request.WorkerJobInstanceStatusRequest; import io.openjob.common.request.WorkerJobInstanceTaskBatchRequest; -import io.openjob.server.alarm.constant.AlarmEventEnum; -import io.openjob.server.alarm.dto.AlarmEventDTO; -import io.openjob.server.alarm.event.AlarmEvent; -import io.openjob.server.alarm.event.AlarmEventPublisher; -import io.openjob.server.cluster.executor.WorkerJobInstanceExecutor; -import io.openjob.server.cluster.executor.WorkerJobInstanceTaskExecutor; -import io.openjob.server.repository.dao.JobInstanceDAO; -import io.openjob.server.repository.dao.JobInstanceTaskDAO; -import io.openjob.server.repository.entity.JobInstanceTask; +import io.openjob.server.cluster.dto.WorkerJobInstanceStatusReqDTO; +import io.openjob.server.cluster.dto.WorkerJobInstanceTaskBatchReqDTO; +import io.openjob.server.cluster.manager.JobInstanceManager; +import io.openjob.server.common.util.BeanMapperUtil; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.dao.DataIntegrityViolationException; import org.springframework.stereotype.Service; -import org.springframework.transaction.UnexpectedRollbackException; -import org.springframework.transaction.annotation.Transactional; - -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; /** * @author stelin swoft@qq.com @@ -32,96 +17,19 @@ @Service @Log4j2 public class JobInstanceService { - private final JobInstanceTaskDAO jobInstanceTaskDAO; - private final JobInstanceDAO jobInstanceDAO; - private final WorkerJobInstanceExecutor workerJobInstanceExecutor; - private final WorkerJobInstanceTaskExecutor workerJobInstanceTaskExecutor; + private final JobInstanceManager jobInstanceManager; @Autowired - public JobInstanceService(JobInstanceTaskDAO jobInstanceTaskDAO, - JobInstanceDAO jobInstanceDAO, - WorkerJobInstanceExecutor workerJobInstanceExecutor, - WorkerJobInstanceTaskExecutor workerJobInstanceTaskExecutor) { - this.jobInstanceTaskDAO = jobInstanceTaskDAO; - this.jobInstanceDAO = jobInstanceDAO; - this.workerJobInstanceExecutor = workerJobInstanceExecutor; - this.workerJobInstanceTaskExecutor = workerJobInstanceTaskExecutor; + public JobInstanceService(JobInstanceManager jobInstanceManager) { + this.jobInstanceManager = jobInstanceManager; } - @Transactional(rollbackFor = Exception.class, timeout = 1) public void handleInstanceStatus(WorkerJobInstanceStatusRequest statusRequest) { - this.workerJobInstanceExecutor.submit(statusRequest); + this.jobInstanceManager.handleInstanceStatus(BeanMapperUtil.map(statusRequest, WorkerJobInstanceStatusReqDTO.class)); } - /** - * Handle instance status. - * - * @param statusList status request. - */ - @Transactional(rollbackFor = Exception.class) - public void handleConsumerInstanceStatus(List statusList) { - // Update status - statusList.forEach(s -> { - this.jobInstanceDAO.updateStatusById(s.getJobInstanceId(), s.getStatus(), s.getFailStatus()); - this.addAlarmEvent(s); - }); - } - @Transactional(rollbackFor = Exception.class, timeout = 1) public void handleInstanceTasks(WorkerJobInstanceTaskBatchRequest taskBatchRequest) { - this.workerJobInstanceTaskExecutor.submit(taskBatchRequest); - } - - /** - * Handle instance status. - * - * @param taskList task list - */ - @Transactional(rollbackFor = Exception.class) - public void handleConsumerInstanceTasks(List taskList) { - List saveList = new ArrayList<>(); - taskList.forEach(l -> l.getTaskRequestList().forEach(t -> { - JobInstanceTask jobInstanceTask = new JobInstanceTask(); - jobInstanceTask.setJobId(t.getJobId()); - jobInstanceTask.setJobInstanceId(t.getJobInstanceId()); - jobInstanceTask.setDispatchVersion(t.getDispatchVersion()); - jobInstanceTask.setCircleId(t.getCircleId()); - jobInstanceTask.setTaskId(t.getTaskId()); - jobInstanceTask.setParentTaskId(t.getParentTaskId()); - jobInstanceTask.setTaskName(t.getTaskName()); - jobInstanceTask.setStatus(t.getStatus()); - jobInstanceTask.setResult(t.getResult()); - jobInstanceTask.setWorkerAddress(t.getWorkerAddress()); - jobInstanceTask.setDeleted(CommonConstant.NO); - jobInstanceTask.setDeleteTime(0L); - jobInstanceTask.setCreateTime(t.getCreateTime()); - jobInstanceTask.setUpdateTime(t.getUpdateTime()); - saveList.add(jobInstanceTask); - })); - - try { - this.jobInstanceTaskDAO.batchSave(saveList); - } catch (DataIntegrityViolationException | UnexpectedRollbackException exception) { - log.warn("Data has been saved! {}", taskList); - } - } - - protected void addAlarmEvent(WorkerJobInstanceStatusRequest statusRequest) { - if (InstanceStatusEnum.isFailed(statusRequest.getStatus())) { - AlarmEventDTO alarmEventDTO = new AlarmEventDTO(); - alarmEventDTO.setJobUniqueId(String.valueOf(statusRequest.getJobId())); - alarmEventDTO.setInstanceId(String.valueOf(statusRequest.getJobInstanceId())); - - // Fail status - if (FailStatusEnum.isExecuteTimeout(statusRequest.getFailStatus())) { - alarmEventDTO.setName(AlarmEventEnum.JOB_EXECUTE_TIMEOUT.getEvent()); - } else { - alarmEventDTO.setName(AlarmEventEnum.JOB_EXECUTE_FAIL.getEvent()); - } - - // Event message - alarmEventDTO.setMessage(Optional.ofNullable(statusRequest.getResult()).orElse("")); - AlarmEventPublisher.publishEvent(new AlarmEvent(alarmEventDTO)); - } + this.jobInstanceManager.handleInstanceTasks(BeanMapperUtil.map(taskBatchRequest, WorkerJobInstanceTaskBatchReqDTO.class)); } } diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceTaskLogService.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceTaskLogService.java index 89773bd2..783ca8e7 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceTaskLogService.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceTaskLogService.java @@ -1,23 +1,13 @@ package io.openjob.server.cluster.service; -import com.google.common.collect.Lists; -import io.openjob.common.constant.LogFieldConstant; -import io.openjob.common.request.WorkerJobInstanceTaskLogFieldRequest; import io.openjob.common.request.WorkerJobInstanceTaskLogRequest; -import io.openjob.server.cluster.executor.WorkerTaskLogExecutor; +import io.openjob.server.cluster.dto.WorkerJobInstanceTaskLogReqDTO; +import io.openjob.server.cluster.manager.JobInstanceTaskLogManager; import io.openjob.server.common.util.BeanMapperUtil; -import io.openjob.server.log.dao.LogDAO; -import io.openjob.server.log.dto.ProcessorLogDTO; -import io.openjob.server.log.dto.ProcessorLogFieldDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; - /** * @author stelin swoft@qq.com * @since 1.0.0 @@ -25,60 +15,20 @@ @Slf4j @Service public class JobInstanceTaskLogService { - private final LogDAO logDAO; - private final WorkerTaskLogExecutor executor; + private final JobInstanceTaskLogManager jobInstanceTaskLogManager; @Autowired - public JobInstanceTaskLogService(LogDAO logDAO, WorkerTaskLogExecutor executor) { - this.logDAO = logDAO; - this.executor = executor; + public JobInstanceTaskLogService(JobInstanceTaskLogManager jobInstanceTaskLogManager) { + this.jobInstanceTaskLogManager = jobInstanceTaskLogManager; } + /** * Handle instance log. * * @param logReq log request. */ public void handleInstanceTaskLog(WorkerJobInstanceTaskLogRequest logReq) { - this.executor.submit(logReq); - } - - /** - * Batch instance log - * - * @param requests requests - */ - public void batchInstanceTaskLog(List requests) { - List processorLogList = requests.stream().flatMap(r -> r.getFieldList().stream().map(fields -> { - // Field map. - Map> fieldMap = fields.stream() - .collect(Collectors.groupingBy(WorkerJobInstanceTaskLogFieldRequest::getName)); - - // Task id. - String taskId = Optional.ofNullable(fieldMap.get(LogFieldConstant.TASK_ID)) - .orElseGet(() -> Lists.newArrayList(new WorkerJobInstanceTaskLogFieldRequest())).get(0).getValue(); - - // Worker address. - String workerAddress = Optional.ofNullable(fieldMap.get(LogFieldConstant.WORKER_ADDRESS)) - .orElseGet(() -> Lists.newArrayList(new WorkerJobInstanceTaskLogFieldRequest())).get(0).getValue(); - - // Time - String timeStamp = Optional.ofNullable(fieldMap.get(LogFieldConstant.TIME_STAMP)) - .orElseGet(() -> Lists.newArrayList(new WorkerJobInstanceTaskLogFieldRequest())).get(0).getValue(); - - ProcessorLogDTO processorLog = new ProcessorLogDTO(); - processorLog.setTaskId(taskId); - processorLog.setWorkerAddress(workerAddress); - processorLog.setTime(Long.valueOf(timeStamp)); - processorLog.setFields(BeanMapperUtil.mapList(fields, WorkerJobInstanceTaskLogFieldRequest.class, ProcessorLogFieldDTO.class)); - return processorLog; - })).collect(Collectors.toList()); - - try { - logDAO.batchAdd(processorLogList); - } catch (Exception e) { - log.error("Batch add task log failed!", e); - throw new RuntimeException(e); - } + this.jobInstanceTaskLogManager.handleInstanceTaskLog(BeanMapperUtil.map(logReq, WorkerJobInstanceTaskLogReqDTO.class)); } } diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JoinService.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JoinService.java index 4347a0fb..efea1bb3 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JoinService.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JoinService.java @@ -1,6 +1,6 @@ package io.openjob.server.cluster.service; -import io.openjob.server.cluster.manager.JoinManager; +import io.openjob.server.cluster.common.JoinCommon; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -12,11 +12,11 @@ @Slf4j @Service public class JoinService { - private final JoinManager joinManager; + private final JoinCommon joinCommon; @Autowired - public JoinService(JoinManager joinManager) { - this.joinManager = joinManager; + public JoinService(JoinCommon joinCommon) { + this.joinCommon = joinCommon; } /** @@ -26,6 +26,6 @@ public JoinService(JoinManager joinManager) { * @param port port */ public void join(String hostname, Integer port) { - this.joinManager.join(hostname, port); + this.joinCommon.join(hostname, port); } } diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/WorkerHeartbeatService.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/WorkerHeartbeatService.java index d4b74dd8..318ec501 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/WorkerHeartbeatService.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/WorkerHeartbeatService.java @@ -3,23 +3,13 @@ import io.openjob.common.request.WorkerHeartbeatRequest; import io.openjob.common.response.ServerHeartbeatResponse; import io.openjob.common.response.ServerHeartbeatSystemResponse; -import io.openjob.common.util.DateUtil; -import io.openjob.server.cluster.executor.WorkerHeartbeatExecutor; -import io.openjob.server.cluster.util.ClusterUtil; -import io.openjob.server.common.ClusterContext; -import io.openjob.server.common.dto.SystemDTO; -import io.openjob.server.repository.dao.JobInstanceDAO; -import io.openjob.server.repository.dao.WorkerDAO; +import io.openjob.server.cluster.dto.WorkerHeartbeatReqDTO; +import io.openjob.server.cluster.dto.WorkerHeartbeatRespDTO; +import io.openjob.server.cluster.manager.WorkerHeartbeatManager; +import io.openjob.server.common.util.BeanMapperUtil; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.util.CollectionUtils; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; /** * @author stelin swoft@qq.com @@ -28,17 +18,11 @@ @Service @Log4j2 public class WorkerHeartbeatService { - private final WorkerDAO workerDAO; - private final JobInstanceDAO jobInstanceDAO; - private final WorkerHeartbeatExecutor workerHeartbeatExecutor; + private final WorkerHeartbeatManager workerHeartbeatManager; @Autowired - public WorkerHeartbeatService(WorkerDAO workerDAO, - JobInstanceDAO jobInstanceDAO, - WorkerHeartbeatExecutor workerHeartbeatExecutor) { - this.workerDAO = workerDAO; - this.jobInstanceDAO = jobInstanceDAO; - this.workerHeartbeatExecutor = workerHeartbeatExecutor; + public WorkerHeartbeatService(WorkerHeartbeatManager workerHeartbeatManager) { + this.workerHeartbeatManager = workerHeartbeatManager; } /** @@ -47,51 +31,17 @@ public WorkerHeartbeatService(WorkerDAO workerDAO, * @param heartbeatReq heartbeat request. */ public ServerHeartbeatResponse workerHeartbeat(WorkerHeartbeatRequest heartbeatReq) { - //Submit request - this.workerHeartbeatExecutor.submit(heartbeatReq); + WorkerHeartbeatRespDTO workerHeartbeatRespDTO = this.workerHeartbeatManager.workerHeartbeat(BeanMapperUtil.map(heartbeatReq, WorkerHeartbeatReqDTO.class)); - ServerHeartbeatResponse response = new ServerHeartbeatResponse(); + // System response ServerHeartbeatSystemResponse systemResponse = new ServerHeartbeatSystemResponse(); - - // System information. - SystemDTO system = ClusterContext.getSystem(); - systemResponse.setClusterVersion(system.getClusterVersion()); - systemResponse.setClusterDelayVersion(system.getClusterDelayVersion()); - response.setSystemResponse(systemResponse); - - // Online workers and exclude start worker. - Set onlineWorkers = ClusterUtil.getOnlineWorkers(heartbeatReq.getAppId()); - response.setWorkerAddressList(onlineWorkers); - return response; - } - - /** - * Batch heartbeat - * - * @param requests requests - */ - @Transactional(rollbackFor = Exception.class) - public void batchHeartbeat(List requests) { - Long timestamp = DateUtil.timestamp(); - Set addressList = new HashSet<>(); - Set instanceIds = new HashSet<>(); - - // Merge requests - requests.forEach(r -> { - addressList.add(r.getAddress()); - if (!CollectionUtils.isEmpty(r.getRunningJobInstanceIds())) { - instanceIds.addAll(r.getRunningJobInstanceIds()); - } - }); - - // Worker heartbeat - if (!CollectionUtils.isEmpty(addressList)) { - this.workerDAO.updateLastHeartbeatTimeByAddresses(new ArrayList<>(addressList), timestamp); - } - - // Instance last report time. - if (!CollectionUtils.isEmpty(instanceIds)) { - this.jobInstanceDAO.updateLastReportTimeByIds(new ArrayList<>(instanceIds), timestamp); - } + systemResponse.setClusterDelayVersion(workerHeartbeatRespDTO.getClusterDelayVersion()); + systemResponse.setClusterVersion(workerHeartbeatRespDTO.getClusterVersion()); + + // Response + ServerHeartbeatResponse serverHeartbeatResponse = new ServerHeartbeatResponse(); + serverHeartbeatResponse.setWorkerAddressList(workerHeartbeatRespDTO.getWorkerAddressList()); + serverHeartbeatResponse.setSystemResponse(systemResponse); + return serverHeartbeatResponse; } } diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/WorkerService.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/WorkerService.java index 1afaa412..f04a9bbf 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/WorkerService.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/WorkerService.java @@ -1,37 +1,29 @@ package io.openjob.server.cluster.service; import com.google.common.collect.Lists; -import io.openjob.common.OpenjobSpringContext; -import io.openjob.common.constant.CommonConstant; import io.openjob.common.request.WorkerStartRequest; import io.openjob.common.request.WorkerStopRequest; import io.openjob.common.response.ServerWorkerStartResponse; import io.openjob.common.util.DateUtil; import io.openjob.server.cluster.autoconfigure.ClusterProperties; -import io.openjob.server.cluster.dto.WorkerFailDTO; -import io.openjob.server.cluster.dto.WorkerJoinDTO; -import io.openjob.server.cluster.data.RefreshData; -import io.openjob.server.cluster.util.ClusterUtil; +import io.openjob.server.cluster.dto.WorkerStartReqDTO; +import io.openjob.server.cluster.dto.WorkerStartRespDTO; +import io.openjob.server.cluster.dto.WorkerStopReqDTO; +import io.openjob.server.cluster.manager.WorkerManager; import io.openjob.server.common.ClusterContext; -import io.openjob.server.common.util.SlotsUtil; +import io.openjob.server.common.util.BeanMapperUtil; import io.openjob.server.repository.constant.WorkerStatusEnum; -import io.openjob.server.repository.dao.AppDAO; import io.openjob.server.repository.dao.WorkerDAO; -import io.openjob.server.repository.entity.App; import io.openjob.server.repository.entity.Worker; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.UUID; import java.util.stream.Collectors; /** @@ -42,16 +34,14 @@ @Log4j2 public class WorkerService { private final WorkerDAO workerDAO; - private final AppDAO appDAO; private final ClusterProperties clusterProperties; - private final RefreshData refreshManager; + private final WorkerManager workerManager; @Autowired - public WorkerService(WorkerDAO workerDAO, AppDAO appDAO, ClusterProperties clusterProperties, RefreshData refreshManager) { + public WorkerService(WorkerDAO workerDAO, ClusterProperties clusterProperties, WorkerManager workerManager) { this.workerDAO = workerDAO; - this.appDAO = appDAO; this.clusterProperties = clusterProperties; - this.refreshManager = refreshManager; + this.workerManager = workerManager; } /** @@ -60,44 +50,9 @@ public WorkerService(WorkerDAO workerDAO, AppDAO appDAO, ClusterProperties clust * @param startRequest start request. */ public ServerWorkerStartResponse workerStart(WorkerStartRequest startRequest) { - // Check app name. - App app = this.checkAppName(startRequest.getAppName()); - - // Do worker start. - OpenjobSpringContext.getBean(this.getClass()).doWorkerStart(startRequest, app); - - // Akka message for worker start. - WorkerJoinDTO workerJoinDTO = new WorkerJoinDTO(); - workerJoinDTO.setClusterVersion(ClusterContext.getSystem().getClusterVersion()); - ClusterUtil.sendMessage(workerJoinDTO, ClusterContext.getCurrentNode(), this.clusterProperties.getSpreadSize(), new HashSet<>()); - - // Online workers and exclude start worker. - Set onlineWorkers = ClusterUtil.getOnlineWorkers(app.getId()); - - // Response - ServerWorkerStartResponse response = new ServerWorkerStartResponse(); - response.setAppId(app.getId()); - response.setAppName(app.getName()); - response.setWorkerAddressList(onlineWorkers); - return response; - } - - /** - * Do worker start. - * - * @param workerStartRequest workerStartRequest - */ - @Transactional(rollbackFor = Exception.class) - public void doWorkerStart(WorkerStartRequest workerStartRequest, App app) { - // Refresh system. - // Lock system cluster version. - this.refreshManager.refreshSystem(true); - - // Update worker status. - this.updateWorkerForStart(workerStartRequest, app); - - // Refresh cluster context. - refreshClusterContext(); + WorkerStartReqDTO reqDTO = BeanMapperUtil.map(startRequest, WorkerStartReqDTO.class); + WorkerStartRespDTO workerStartRespDTO = this.workerManager.workerStart(reqDTO); + return BeanMapperUtil.map(workerStartRespDTO, ServerWorkerStartResponse.class); } /** @@ -106,45 +61,9 @@ public void doWorkerStart(WorkerStartRequest workerStartRequest, App app) { * @param stopReq stop request. */ public void workerStop(WorkerStopRequest stopReq) { - // Check app name. - this.checkAppName(stopReq.getAppName()); - - // Do worker stop. - OpenjobSpringContext.getBean(this.getClass()).doWorkerStop(stopReq); - - // Akka message for worker start. - WorkerFailDTO workerFailDTO = new WorkerFailDTO(); - workerFailDTO.setClusterVersion(ClusterContext.getSystem().getClusterVersion()); - ClusterUtil.sendMessage(workerFailDTO, ClusterContext.getCurrentNode(), this.clusterProperties.getSpreadSize(), new HashSet<>()); + this.workerManager.workerStop(BeanMapperUtil.map(stopReq, WorkerStopReqDTO.class)); } - /** - * Do worker stop. - * - * @param stopReq stopReq - */ - @Transactional(rollbackFor = Exception.class) - public void doWorkerStop(WorkerStopRequest stopReq) { - // Refresh system. - // Lock system cluster version. - this.refreshManager.refreshSystem(true); - - Worker worker = workerDAO.getByAddress(stopReq.getAddress()); - if (Objects.isNull(worker)) { - log.error("worker({}) do not exist!", stopReq.getAddress()); - return; - } - - // Update worker - worker.setStatus(WorkerStatusEnum.OFFLINE.getStatus()); - worker.setUpdateTime(DateUtil.timestamp()); - workerDAO.save(worker); - - // Refresh cluster context. - this.refreshClusterContext(); - } - - /** * Worker check */ @@ -198,68 +117,4 @@ public void workerCheck() { } }); } - - private void refreshClusterContext() { - List workers = workerDAO.listOnlineWorkers(); - ClusterUtil.refreshAppWorkers(workers); - - log.info("Refresh app workers={}", workers.stream().map(Worker::getAddress).collect(Collectors.toList())); - } - - /** - * Check app name. - * - * @param appName app name. - * @return App - */ - private App checkAppName(String appName) { - App app = appDAO.getAppByName(appName); - if (Objects.isNull(app)) { - throw new RuntimeException(String.format("Register application(%s) do not exist!", appName)); - } - return app; - } - - private void updateWorkerForStart(WorkerStartRequest startReq, App app) { - // Update worker. - long now = DateUtil.timestamp(); - Worker worker = workerDAO.getByAddress(startReq.getAddress()); - if (Objects.nonNull(worker)) { - worker.setWorkerKey(worker.getWorkerKey()); - worker.setStatus(WorkerStatusEnum.ONLINE.getStatus()); - - //Must update last heartbeat time. - worker.setLastHeartbeatTime(now); - worker.setUpdateTime(now); - - // Update latest app name - worker.setAppName(startReq.getAppName()); - worker.setNamespaceId(app.getNamespaceId()); - worker.setAppId(app.getId()); - worker.setProtocolType(startReq.getProtocolType()); - workerDAO.save(worker); - return; - } - - // save - Worker saveWorker = new Worker(); - saveWorker.setUpdateTime(now); - saveWorker.setCreateTime(now); - saveWorker.setWorkerKey(startReq.getWorkerKey()); - saveWorker.setAddress(startReq.getAddress()); - saveWorker.setSlotsId(SlotsUtil.getWorkerSupervisorSlotsId(UUID.randomUUID().toString())); - saveWorker.setStatus(WorkerStatusEnum.ONLINE.getStatus()); - saveWorker.setAppId(app.getId()); - saveWorker.setAppName(startReq.getAppName()); - saveWorker.setLastHeartbeatTime(now); - saveWorker.setNamespaceId(app.getNamespaceId()); - saveWorker.setProtocolType(startReq.getProtocolType()); - saveWorker.setVersion(startReq.getVersion()); - saveWorker.setMetric(""); - saveWorker.setVersion(""); - saveWorker.setWorkerKey(""); - saveWorker.setDeleteTime(0L); - saveWorker.setDeleted(CommonConstant.NO); - workerDAO.save(saveWorker); - } } diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerHeartConsumer.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerHeartConsumer.java index e2cee9cd..624900d0 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerHeartConsumer.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerHeartConsumer.java @@ -1,10 +1,10 @@ package io.openjob.server.cluster.task; import io.openjob.common.OpenjobSpringContext; -import io.openjob.common.request.WorkerHeartbeatRequest; import io.openjob.common.task.BaseConsumer; import io.openjob.common.task.TaskQueue; -import io.openjob.server.cluster.service.WorkerHeartbeatService; +import io.openjob.server.cluster.dto.WorkerHeartbeatReqDTO; +import io.openjob.server.cluster.manager.WorkerHeartbeatManager; import lombok.extern.slf4j.Slf4j; import java.util.List; @@ -14,7 +14,7 @@ * @since 1.0.3 */ @Slf4j -public class WorkerHeartConsumer extends BaseConsumer { +public class WorkerHeartConsumer extends BaseConsumer { public WorkerHeartConsumer(Long id, Integer consumerCoreThreadNum, @@ -22,12 +22,12 @@ public WorkerHeartConsumer(Long id, String consumerThreadName, Integer pollSize, String pollThreadName, - TaskQueue queues) { + TaskQueue queues) { super(id, consumerCoreThreadNum, consumerMaxThreadNum, consumerThreadName, pollSize, pollThreadName, queues, 2000L, 1000L); } @Override - public void consume(Long id, List tasks) { + public void consume(Long id, List tasks) { this.consumerExecutor.submit(new WorkerHeartbeatConsumerRunnable(tasks)); } @@ -35,16 +35,16 @@ public void consume(Long id, List tasks) { * Worker heartbeat runnable */ private static class WorkerHeartbeatConsumerRunnable implements Runnable { - private final List tasks; + private final List tasks; - private WorkerHeartbeatConsumerRunnable(List tasks) { + private WorkerHeartbeatConsumerRunnable(List tasks) { this.tasks = tasks; } @Override public void run() { try { - OpenjobSpringContext.getBean(WorkerHeartbeatService.class).batchHeartbeat(this.tasks); + OpenjobSpringContext.getBean(WorkerHeartbeatManager.class).batchHeartbeat(this.tasks); } catch (Throwable throwable) { log.error("Worker heartbeat failed!", throwable); } diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerJobInstanceConsumer.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerJobInstanceConsumer.java index ae4a9dc4..08183e5d 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerJobInstanceConsumer.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerJobInstanceConsumer.java @@ -1,10 +1,10 @@ package io.openjob.server.cluster.task; import io.openjob.common.OpenjobSpringContext; -import io.openjob.common.request.WorkerJobInstanceStatusRequest; import io.openjob.common.task.BaseConsumer; import io.openjob.common.task.TaskQueue; -import io.openjob.server.cluster.service.JobInstanceService; +import io.openjob.server.cluster.dto.WorkerJobInstanceStatusReqDTO; +import io.openjob.server.cluster.manager.JobInstanceManager; import lombok.extern.slf4j.Slf4j; import java.util.List; @@ -14,33 +14,33 @@ * @since 1.0.6 */ @Slf4j -public class WorkerJobInstanceConsumer extends BaseConsumer { +public class WorkerJobInstanceConsumer extends BaseConsumer { public WorkerJobInstanceConsumer(Long id, Integer consumerCoreThreadNum, Integer consumerMaxThreadNum, String consumerThreadName, Integer pollSize, String pollThreadName, - TaskQueue queues) { + TaskQueue queues) { super(id, consumerCoreThreadNum, consumerMaxThreadNum, consumerThreadName, pollSize, pollThreadName, queues, 1000L, 1000L); } @Override - public void consume(Long id, List tasks) { + public void consume(Long id, List tasks) { this.consumerExecutor.submit(new WorkerJobInstanceConsumerRunnable(tasks)); } private static class WorkerJobInstanceConsumerRunnable implements Runnable { - private final List tasks; + private final List tasks; - private WorkerJobInstanceConsumerRunnable(List tasks) { + private WorkerJobInstanceConsumerRunnable(List tasks) { this.tasks = tasks; } @Override public void run() { try { - OpenjobSpringContext.getBean(JobInstanceService.class).handleConsumerInstanceStatus(tasks); + OpenjobSpringContext.getBean(JobInstanceManager.class).handleConsumerInstanceStatus(tasks); } catch (Throwable throwable) { log.error("Handler consumer instance status failed!", throwable); } diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerJobInstanceTaskConsumer.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerJobInstanceTaskConsumer.java index 1f46ac45..cf21c063 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerJobInstanceTaskConsumer.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerJobInstanceTaskConsumer.java @@ -1,10 +1,10 @@ package io.openjob.server.cluster.task; import io.openjob.common.OpenjobSpringContext; -import io.openjob.common.request.WorkerJobInstanceTaskBatchRequest; import io.openjob.common.task.BaseConsumer; import io.openjob.common.task.TaskQueue; -import io.openjob.server.cluster.service.JobInstanceService; +import io.openjob.server.cluster.dto.WorkerJobInstanceTaskBatchReqDTO; +import io.openjob.server.cluster.manager.JobInstanceManager; import lombok.extern.slf4j.Slf4j; import java.util.List; @@ -14,33 +14,33 @@ * @since 1.0.6 */ @Slf4j -public class WorkerJobInstanceTaskConsumer extends BaseConsumer { +public class WorkerJobInstanceTaskConsumer extends BaseConsumer { public WorkerJobInstanceTaskConsumer(Long id, Integer consumerCoreThreadNum, Integer consumerMaxThreadNum, String consumerThreadName, Integer pollSize, String pollThreadName, - TaskQueue queues) { + TaskQueue queues) { super(id, consumerCoreThreadNum, consumerMaxThreadNum, consumerThreadName, pollSize, pollThreadName, queues, 1000L, 1000L); } @Override - public void consume(Long id, List tasks) { + public void consume(Long id, List tasks) { this.consumerExecutor.submit(new WorkerJobInstanceTaskConsumerRunnable(tasks)); } private static class WorkerJobInstanceTaskConsumerRunnable implements Runnable { - private final List tasks; + private final List tasks; - private WorkerJobInstanceTaskConsumerRunnable(List tasks) { + private WorkerJobInstanceTaskConsumerRunnable(List tasks) { this.tasks = tasks; } @Override public void run() { try { - OpenjobSpringContext.getBean(JobInstanceService.class).handleConsumerInstanceTasks(tasks); + OpenjobSpringContext.getBean(JobInstanceManager.class).handleConsumerInstanceTasks(tasks); } catch (Throwable throwable) { log.error("Handler consumer instance status failed!", throwable); } diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerTaskLogConsumer.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerTaskLogConsumer.java index fa611969..135fc1b2 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerTaskLogConsumer.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerTaskLogConsumer.java @@ -1,9 +1,10 @@ package io.openjob.server.cluster.task; import io.openjob.common.OpenjobSpringContext; -import io.openjob.common.request.WorkerJobInstanceTaskLogRequest; import io.openjob.common.task.BaseConsumer; import io.openjob.common.task.TaskQueue; +import io.openjob.server.cluster.dto.WorkerJobInstanceTaskLogReqDTO; +import io.openjob.server.cluster.manager.JobInstanceTaskLogManager; import io.openjob.server.cluster.service.JobInstanceTaskLogService; import lombok.extern.slf4j.Slf4j; @@ -14,7 +15,7 @@ * @since 1.0.3 */ @Slf4j -public class WorkerTaskLogConsumer extends BaseConsumer { +public class WorkerTaskLogConsumer extends BaseConsumer { public WorkerTaskLogConsumer(Long id, Integer consumerCoreThreadNum, @@ -22,26 +23,26 @@ public WorkerTaskLogConsumer(Long id, String consumerThreadName, Integer pollSize, String pollThreadName, - TaskQueue queues) { + TaskQueue queues) { super(id, consumerCoreThreadNum, consumerMaxThreadNum, consumerThreadName, pollSize, pollThreadName, queues, 1000L, 1000L); } @Override - public void consume(Long id, List tasks) { + public void consume(Long id, List tasks) { this.consumerExecutor.submit(new WorkerTaskLogRunnable(tasks)); } private static class WorkerTaskLogRunnable implements Runnable { - private final List tasks; + private final List tasks; - private WorkerTaskLogRunnable(List tasks) { + private WorkerTaskLogRunnable(List tasks) { this.tasks = tasks; } @Override public void run() { try { - OpenjobSpringContext.getBean(JobInstanceTaskLogService.class).batchInstanceTaskLog(this.tasks); + OpenjobSpringContext.getBean(JobInstanceTaskLogManager.class).batchInstanceTaskLog(this.tasks); } catch (Throwable throwable) { log.error("Job instance log failed!", throwable); } diff --git a/openjob-server/openjob-server-common/pom.xml b/openjob-server/openjob-server-common/pom.xml index db90d985..e4a4ef95 100644 --- a/openjob-server/openjob-server-common/pom.xml +++ b/openjob-server/openjob-server-common/pom.xml @@ -5,7 +5,7 @@ openjob-server io.openjob - 1.0.7 + 1.0.8 4.0.0 openjob-server-common diff --git a/openjob-server/openjob-server-dispatcher/pom.xml b/openjob-server/openjob-server-dispatcher/pom.xml index 095a11fb..2e9bbccb 100644 --- a/openjob-server/openjob-server-dispatcher/pom.xml +++ b/openjob-server/openjob-server-dispatcher/pom.xml @@ -5,7 +5,7 @@ openjob-server io.openjob - 1.0.7 + 1.0.8 4.0.0 diff --git a/openjob-server/openjob-server-log/pom.xml b/openjob-server/openjob-server-log/pom.xml index 0675a884..b4e8494a 100644 --- a/openjob-server/openjob-server-log/pom.xml +++ b/openjob-server/openjob-server-log/pom.xml @@ -5,7 +5,7 @@ openjob-server io.openjob - 1.0.7 + 1.0.8 4.0.0 openjob-server-log diff --git a/openjob-server/openjob-server-openapi/pom.xml b/openjob-server/openjob-server-openapi/pom.xml index 2a090fda..5f74a184 100644 --- a/openjob-server/openjob-server-openapi/pom.xml +++ b/openjob-server/openjob-server-openapi/pom.xml @@ -5,12 +5,17 @@ openjob-server io.openjob - 1.0.7 + 1.0.8 4.0.0 openjob-server-openapi + + io.openjob + openjob-server-cluster + + io.openjob openjob-server-repository diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/autoconfigure/ClusterAutoConfiguration.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/autoconfigure/ClusterAutoConfiguration.java new file mode 100644 index 00000000..50ca7db4 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/autoconfigure/ClusterAutoConfiguration.java @@ -0,0 +1,13 @@ +package io.openjob.server.openapi.autoconfigure; + +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +/** + * @author stelin swoft@qq.com + * @since 1.0.0 + */ +@Configuration +@EnableConfigurationProperties(ClusterProperties.class) +public class ClusterAutoConfiguration { +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/autoconfigure/ClusterProperties.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/autoconfigure/ClusterProperties.java new file mode 100644 index 00000000..5230b2d3 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/autoconfigure/ClusterProperties.java @@ -0,0 +1,67 @@ +package io.openjob.server.openapi.autoconfigure; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * @author stelin swoft@qq.com + * @since 1.0.0 + */ +@Data +@ConfigurationProperties(prefix = "openjob.cluster") +public class ClusterProperties { + + /** + * Cluster worker + */ + private Worker worker = new Worker(); + + /** + * Ping timeout(ms). + */ + private Long pingTimeout = 3000L; + + /** + * Spread node size. + */ + private Integer spreadSize = 3; + + /** + * Spread retry times. + */ + private Integer spreadRetryTimes = 3; + + /** + * Node fail times. + */ + private Integer nodeFailTimes = 3; + + /** + * Node success times. + */ + private Integer nodeSuccessTimes = 3; + + /** + * Cluster node fail period time(ms). + */ + private Integer nodeFailPeriodTime = 15000; + + /** + * Cluster node success period time(ms). + */ + private Integer nodeSuccessPeriodTime = 15000; + + @Data + public static class Worker { + + /** + * Worker online period(s). + */ + private Integer onlinePeriod = 10; + + /** + * Worker offline period(s). + */ + private Integer offlinePeriod = 20; + } +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/controller/OpenClusterController.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/controller/OpenClusterController.java index 88a91a65..f0af24aa 100644 --- a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/controller/OpenClusterController.java +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/controller/OpenClusterController.java @@ -2,7 +2,7 @@ import io.openjob.common.response.Result; import io.openjob.server.openapi.request.ClusterOnlineRequest; -import io.openjob.server.openapi.service.ClusterService; +import io.openjob.server.openapi.service.OpenClusterService; import io.openjob.server.openapi.vo.ClusterOnlineVO; import io.swagger.annotations.Api; import org.springframework.beans.factory.annotation.Autowired; @@ -21,15 +21,15 @@ @Api(value = "Cluster", tags = "Cluster") @RequestMapping("/openapi/cluster") public class OpenClusterController { - private final ClusterService clusterService; + private final OpenClusterService openClusterService; @Autowired - public OpenClusterController(ClusterService clusterService) { - this.clusterService = clusterService; + public OpenClusterController(OpenClusterService openClusterService) { + this.openClusterService = openClusterService; } @GetMapping("/online") public Result add(@Valid @ModelAttribute ClusterOnlineRequest clusterOnlineRequest) { - return Result.success(this.clusterService.online(clusterOnlineRequest)); + return Result.success(this.openClusterService.online(clusterOnlineRequest)); } } diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/controller/OpenJobInstanceController.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/controller/OpenJobInstanceController.java new file mode 100644 index 00000000..11174cc3 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/controller/OpenJobInstanceController.java @@ -0,0 +1,45 @@ +package io.openjob.server.openapi.controller; + +import io.openjob.common.response.Result; +import io.openjob.server.openapi.request.WorkerJobInstanceStatusRequest; +import io.openjob.server.openapi.request.WorkerJobInstanceTaskBatchRequest; +import io.openjob.server.openapi.service.OpenJobInstanceService; +import io.openjob.server.openapi.vo.WorkerJobInstanceStatusVO; +import io.openjob.server.openapi.vo.WorkerJobInstanceTaskBatchVO; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author zhenghongyang sakuraovq@gmail.com + * @since 1.0.0 + */ +@RestController +@Api(value = "Job-instance", tags = "Job-instance") +@RequestMapping("/openapi/job-instance") +@Slf4j +public class OpenJobInstanceController { + + private final OpenJobInstanceService openJobInstanceService; + + @Autowired + public OpenJobInstanceController(OpenJobInstanceService openJobInstanceService) { + this.openJobInstanceService = openJobInstanceService; + } + + @ApiOperation("Handle status") + @PostMapping("/handle-status") + public Result handleStatus(WorkerJobInstanceStatusRequest statusRequest) { + return Result.success(this.openJobInstanceService.handleStatus(statusRequest)); + } + + @ApiOperation("Handle tasks") + @PostMapping("/handle-tasks") + public Result handleTasks(WorkerJobInstanceTaskBatchRequest batchRequest) { + return Result.success(this.openJobInstanceService.handleTasks(batchRequest)); + } +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/controller/OpenTaskLogController.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/controller/OpenTaskLogController.java new file mode 100644 index 00000000..f1df6a00 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/controller/OpenTaskLogController.java @@ -0,0 +1,40 @@ +package io.openjob.server.openapi.controller; + +import io.openjob.common.response.Result; +import io.openjob.server.openapi.request.WorkerJobInstanceTaskLogRequest; +import io.openjob.server.openapi.service.OpenTaskLogService; +import io.openjob.server.openapi.vo.WorkerJobInstanceTaskLogVO; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author zhenghongyang sakuraovq@gmail.com + * @since 1.0.0 + */ +@RestController +@Api(value = "Task-log", tags = "Task-log") +@RequestMapping("/openapi/task-log") +public class OpenTaskLogController { + + private final OpenTaskLogService openTaskLogService; + + @Autowired + public OpenTaskLogController(OpenTaskLogService openTaskLogService) { + this.openTaskLogService = openTaskLogService; + } + + /** + * During the execution of a task on the client, there is periodic real-time batch reporting of task logs. + * + * @param logRequest log request. + */ + @ApiOperation("batch add") + @PostMapping("/batch-add") + public Result batchAdd(WorkerJobInstanceTaskLogRequest logRequest) { + return Result.success(this.openTaskLogService.batchAdd(logRequest)); + } +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/controller/OpenWorkerController.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/controller/OpenWorkerController.java new file mode 100644 index 00000000..cb94346c --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/controller/OpenWorkerController.java @@ -0,0 +1,71 @@ +package io.openjob.server.openapi.controller; + +import io.openjob.common.response.Result; +import io.openjob.server.openapi.request.WorkerHeartbeatRequest; +import io.openjob.server.openapi.request.WorkerStartRequest; +import io.openjob.server.openapi.request.WorkerStopRequest; +import io.openjob.server.openapi.service.OpenWorkerService; +import io.openjob.server.openapi.vo.ServerHeartbeatVO; +import io.openjob.server.openapi.vo.ServerWorkerStartVO; +import io.openjob.server.openapi.vo.ServerWorkerStopVO; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.validation.Valid; + +/** + * @author zhenghongyang sakuraovq@gmail.com + * @since 1.0.0 + */ +@RestController +@Api(value = "worker", tags = "worker") +@RequestMapping("/openapi/worker") +@Slf4j +public class OpenWorkerController { + + private final OpenWorkerService openWorkerService; + + public OpenWorkerController(OpenWorkerService openWorkerService) { + this.openWorkerService = openWorkerService; + } + + /** + * Client needs to report client information every time it starts. + * + * @param workerStartRequest worker start request + * @return Result + */ + @ApiOperation("Worker start") + @PostMapping("/start") + public Result workerStart(@Valid @RequestBody WorkerStartRequest workerStartRequest) { + ServerWorkerStartVO result = this.openWorkerService.workerStart(workerStartRequest); + log.info("Worker register success! address={}", workerStartRequest.getAddress()); + return Result.success(result); + } + + /** + * Client needs to notify the server every time it goes offline. + * + * @param workerStopRequest worker stop request + * @return Result + */ + @ApiOperation("Worker stop") + @PostMapping("/stop") + public Result workerStop(@Valid @RequestBody WorkerStopRequest workerStopRequest) { + ServerWorkerStopVO result = this.openWorkerService.workerStop(workerStopRequest); + log.info("Worker stop success! address={}", workerStopRequest.getAddress()); + return Result.success(result); + } + + @ApiOperation("worker heartbeat") + @PostMapping("/heartbeat") + public Result heartbeat(@Valid @RequestBody WorkerHeartbeatRequest workerHeartbeatRequest) { + ServerHeartbeatVO result = this.openWorkerService.workerHeartbeat(workerHeartbeatRequest); + return Result.success(result); + } +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/exception/ClusterNodeOperatingException.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/exception/ClusterNodeOperatingException.java new file mode 100644 index 00000000..c9b54e9f --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/exception/ClusterNodeOperatingException.java @@ -0,0 +1,17 @@ +package io.openjob.server.openapi.exception; + +/** + * @author stelin swoft@qq.com + * @since 1.0.0 + */ +public class ClusterNodeOperatingException extends RuntimeException { + + /** + * New exception. + * + * @param message message + */ + public ClusterNodeOperatingException(String message) { + super(message); + } +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/DelayInstanceStatusRequest.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/DelayInstanceStatusRequest.java new file mode 100644 index 00000000..f4827727 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/DelayInstanceStatusRequest.java @@ -0,0 +1,66 @@ +package io.openjob.server.openapi.request; + +import io.openjob.common.constant.FailStatusEnum; +import lombok.Data; + +import java.util.List; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class DelayInstanceStatusRequest { + + private List list; + + @Data + public static class DelayInstanceTaskRequest { + /** + * Topic + */ + private String topic; + + /** + * Delay id. + */ + private Long delayId; + + /** + * Delay pid + */ + private Long delayPid; + + /** + * Task id. + */ + private String taskId; + + /** + * Task status. + */ + private Integer status; + + /** + * Fail status + * + * @see FailStatusEnum#getStatus() + */ + private Integer failStatus; + + /** + * Task result. + */ + private String result; + + /** + * Worker address + */ + private String workerAddress; + + /** + * Complete time + */ + private Long completeTime; + } +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/DelayPullInstanceRequest.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/DelayPullInstanceRequest.java new file mode 100644 index 00000000..4ac5250f --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/DelayPullInstanceRequest.java @@ -0,0 +1,21 @@ +package io.openjob.server.openapi.request; + +import lombok.Data; + +import java.util.List; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class DelayPullInstanceRequest { + private String workerAddress; + private List list; + + @Data + public static class DelayPullTopicItemRequest { + private String topic; + private Integer size; + } +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/DelayPullTopicRequest.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/DelayPullTopicRequest.java new file mode 100644 index 00000000..58a77036 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/DelayPullTopicRequest.java @@ -0,0 +1,12 @@ +package io.openjob.server.openapi.request; + +import lombok.Data; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class DelayPullTopicRequest { + private String appName; +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/WorkerHeartbeatRequest.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/WorkerHeartbeatRequest.java new file mode 100644 index 00000000..e82cbd8b --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/WorkerHeartbeatRequest.java @@ -0,0 +1,38 @@ +package io.openjob.server.openapi.request; + +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Pattern; +import java.util.List; + +/** + * @author zhenghongyang sakuraovq@gmail.com + * @since 1.0.0 + */ +@Data +public class WorkerHeartbeatRequest { + + @NotNull + @ApiModelProperty("App id") + private Long appId; + + @NotBlank + @ApiModelProperty("App name") + private String appName; + + @NotBlank + @Pattern(regexp = "^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}:\\d{1,5}$", message = "must be in format 'IP:PORT'") + @ApiModelProperty("Worker address.") + private String address; + + @NotBlank + @ApiModelProperty("Worker version") + private String version; + + @NotNull + @ApiModelProperty(" Running job instance ids.") + private List runningJobInstanceIds; +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/WorkerJobInstanceStatusRequest.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/WorkerJobInstanceStatusRequest.java new file mode 100644 index 00000000..4e5dd6cf --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/WorkerJobInstanceStatusRequest.java @@ -0,0 +1,46 @@ +package io.openjob.server.openapi.request; + +import io.openjob.common.constant.FailStatusEnum; +import io.openjob.common.constant.InstanceStatusEnum; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; + +/** + * @author zhenghongyang sakuraovq@gmail.com + * @since 1.0.0 + */ +@Data +public class WorkerJobInstanceStatusRequest { + + @NotNull + @ApiModelProperty("Job id.") + private Long jobId; + + @NotNull + @ApiModelProperty("Job instance id.") + private Long jobInstanceId; + + @ApiModelProperty("Current circleId. Only for second delay task.") + private Long circleId = 0L; + + /** + * @see InstanceStatusEnum + */ + @NotNull + @ApiModelProperty("Job instance status.") + private Integer status; + + /** + * @see FailStatusEnum#getStatus() + */ + @NotNull + @ApiModelProperty("Fail status") + private Integer failStatus; + + @NotBlank + @ApiModelProperty("Result") + private String result; +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/WorkerJobInstanceTaskBatchRequest.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/WorkerJobInstanceTaskBatchRequest.java new file mode 100644 index 00000000..361246c5 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/WorkerJobInstanceTaskBatchRequest.java @@ -0,0 +1,72 @@ +package io.openjob.server.openapi.request; + +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; +import java.io.Serializable; +import java.util.List; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class WorkerJobInstanceTaskBatchRequest { + + @ApiModelProperty("Job instance task list.Aggregation many circle task, if second delay task.") + private List taskRequestList; + + @Data + public static class WorkerJobInstanceTaskRequest implements Serializable { + + @NotNull + @ApiModelProperty("Job id.") + private Long jobId; + + @NotNull + @ApiModelProperty("Job instance id.") + private Long jobInstanceId; + + @NotNull + @ApiModelProperty("Dispatch version") + private Long dispatchVersion; + + @NotNull + @ApiModelProperty("Only for second delay task.") + private Long circleId; + + @NotBlank + @ApiModelProperty("Task unique id.") + private String taskId; + + @NotBlank + @ApiModelProperty("Task parent unique id.") + private String parentTaskId; + + @NotBlank + @ApiModelProperty("Task name.") + private String taskName; + + @NotNull + @ApiModelProperty("Task status.") + private Integer status; + + @NotBlank + @ApiModelProperty("Task result.") + private String result; + + @NotBlank + @ApiModelProperty("Task worker address") + private String workerAddress; + + @NotNull + @ApiModelProperty("Task create time.") + private Long createTime; + + @NotNull + @ApiModelProperty("Task update time.") + private Long updateTime; + } +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/WorkerJobInstanceTaskLogRequest.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/WorkerJobInstanceTaskLogRequest.java new file mode 100644 index 00000000..5d816632 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/WorkerJobInstanceTaskLogRequest.java @@ -0,0 +1,32 @@ +package io.openjob.server.openapi.request; + +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; +import java.util.List; + +/** + * @author zhenghongyang sakuraovq@gmail.com + * @since 1.0.0 + */ +@Data +public class WorkerJobInstanceTaskLogRequest { + + @NotNull + @ApiModelProperty(value = "Field list", required = true) + private List> fieldList; + + @Data + public static class WorkerJobInstanceTaskLogFieldRequest { + + @NotBlank + @ApiModelProperty(value = "name", required = true) + private String name; + + @NotBlank + @ApiModelProperty(value = "value", required = true) + private String value; + } +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/WorkerStartRequest.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/WorkerStartRequest.java new file mode 100644 index 00000000..89ca45b9 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/WorkerStartRequest.java @@ -0,0 +1,44 @@ +package io.openjob.server.openapi.request; + +import io.openjob.common.constant.ProtocolTypeEnum; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.Pattern; + +/** + * @author zhenghongyang sakuraovq@gmail.com + * @since 1.0.0 + */ +@Data +public class WorkerStartRequest { + + @NotBlank + @ApiModelProperty("Worker unique id.") + private String workerKey; + + @NotBlank + @ApiModelProperty("App name") + private String appName; + + @NotBlank + @Pattern(regexp = "^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}:\\d{1,5}$", message = "must be in format 'IP:PORT'") + @ApiModelProperty("Worker address") + private String address; + + @NotBlank + @ApiModelProperty("Worker agent version") + private String version; + + @ApiModelProperty("Worker current protocol type, http is default.") + private String protocolType = ProtocolTypeEnum.HTTP.getType(); + + @ApiModelProperty("metric") + private Metric metric; + + @Data + public static class Metric { + + } +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/WorkerStopRequest.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/WorkerStopRequest.java new file mode 100644 index 00000000..a2030698 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/request/WorkerStopRequest.java @@ -0,0 +1,28 @@ +package io.openjob.server.openapi.request; + +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.Pattern; + +/** + * @author zhenghongyang sakuraovq@gmail.com + * @since 1.0.0 + */ +@Data +public class WorkerStopRequest { + + @NotBlank + @ApiModelProperty("Worker unique id.") + private String workerKey; + + @NotBlank + @ApiModelProperty("App name") + private String appName; + + @NotBlank + @Pattern(regexp = "^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}:\\d{1,5}$", message = "must be in format 'IP:PORT'") + @ApiModelProperty("worker address") + private String address; +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/ClusterService.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/OpenClusterService.java similarity index 91% rename from openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/ClusterService.java rename to openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/OpenClusterService.java index 22a5b455..9953b666 100644 --- a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/ClusterService.java +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/OpenClusterService.java @@ -7,7 +7,7 @@ * @author stelin swoft@qq.com * @since 1.0.3 */ -public interface ClusterService { +public interface OpenClusterService { /** * online diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/OpenJobInstanceService.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/OpenJobInstanceService.java new file mode 100644 index 00000000..1f18de27 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/OpenJobInstanceService.java @@ -0,0 +1,29 @@ +package io.openjob.server.openapi.service; + +import io.openjob.server.openapi.request.WorkerJobInstanceStatusRequest; +import io.openjob.server.openapi.request.WorkerJobInstanceTaskBatchRequest; +import io.openjob.server.openapi.vo.WorkerJobInstanceStatusVO; +import io.openjob.server.openapi.vo.WorkerJobInstanceTaskBatchVO; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +public interface OpenJobInstanceService { + + /** + * Handle status + * + * @param statusRequest statusRequest + * @return WorkerJobInstanceStatusVO + */ + WorkerJobInstanceStatusVO handleStatus(WorkerJobInstanceStatusRequest statusRequest); + + /** + * Handle tasks + * + * @param batchRequest batchRequest + * @return WorkerJobInstanceTaskBatchVO + */ + WorkerJobInstanceTaskBatchVO handleTasks(WorkerJobInstanceTaskBatchRequest batchRequest); +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/OpenTaskLogService.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/OpenTaskLogService.java new file mode 100644 index 00000000..5a5edadb --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/OpenTaskLogService.java @@ -0,0 +1,19 @@ +package io.openjob.server.openapi.service; + +import io.openjob.server.openapi.request.WorkerJobInstanceTaskLogRequest; +import io.openjob.server.openapi.vo.WorkerJobInstanceTaskLogVO; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +public interface OpenTaskLogService { + + /** + * Batch add + * + * @param logRequest logRequest + * @return WorkerJobInstanceTaskLogVO + */ + WorkerJobInstanceTaskLogVO batchAdd(WorkerJobInstanceTaskLogRequest logRequest); +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/OpenWorkerService.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/OpenWorkerService.java new file mode 100644 index 00000000..a3346ceb --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/OpenWorkerService.java @@ -0,0 +1,39 @@ +package io.openjob.server.openapi.service; + +import io.openjob.server.openapi.request.WorkerHeartbeatRequest; +import io.openjob.server.openapi.request.WorkerStartRequest; +import io.openjob.server.openapi.request.WorkerStopRequest; +import io.openjob.server.openapi.vo.ServerHeartbeatVO; +import io.openjob.server.openapi.vo.ServerWorkerStartVO; +import io.openjob.server.openapi.vo.ServerWorkerStopVO; + +/** + * @author zhenghongyang sakuraovq@gmail.com + * @since 1.0.0 + */ +public interface OpenWorkerService { + + /** + * Worker start + * + * @param startRequest start request. + * @return Register success response. + */ + ServerWorkerStartVO workerStart(WorkerStartRequest startRequest); + + /** + * Worker stop + * + * @param stopRequest stop request. + * @return Stop success response. + */ + ServerWorkerStopVO workerStop(WorkerStopRequest stopRequest); + + /** + * Worker heartbeat + * + * @param workerHeartbeatRequest heartbeat request. + * @return Current server all worker address + */ + ServerHeartbeatVO workerHeartbeat(WorkerHeartbeatRequest workerHeartbeatRequest); +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/ClusterServiceImpl.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/OpenClusterServiceImpl.java similarity index 86% rename from openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/ClusterServiceImpl.java rename to openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/OpenClusterServiceImpl.java index bb4738f1..8eb2426f 100644 --- a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/ClusterServiceImpl.java +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/OpenClusterServiceImpl.java @@ -3,7 +3,7 @@ import io.openjob.common.context.Node; import io.openjob.server.common.ClusterContext; import io.openjob.server.openapi.request.ClusterOnlineRequest; -import io.openjob.server.openapi.service.ClusterService; +import io.openjob.server.openapi.service.OpenClusterService; import io.openjob.server.openapi.vo.ClusterOnlineVO; import org.springframework.stereotype.Service; @@ -15,7 +15,7 @@ * @since 1.0.3 */ @Service -public class ClusterServiceImpl implements ClusterService { +public class OpenClusterServiceImpl implements OpenClusterService { @Override public ClusterOnlineVO online(ClusterOnlineRequest clusterOnlineRequest) { ClusterOnlineVO clusterOnlineVO = new ClusterOnlineVO(); diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/OpenDelayInstanceServiceImpl.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/OpenDelayInstanceServiceImpl.java index 02ee5187..25a46373 100644 --- a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/OpenDelayInstanceServiceImpl.java +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/OpenDelayInstanceServiceImpl.java @@ -7,13 +7,13 @@ import io.openjob.server.scheduler.dto.DelayInstanceAddResponseDTO; import io.openjob.server.scheduler.scheduler.DelayInstanceScheduler; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; /** * @author stelin swoft@qq.com * @since 1.0.0 */ -@Component +@Service public class OpenDelayInstanceServiceImpl implements OpenDelayInstanceService { private final DelayInstanceScheduler delayInstanceScheduler; diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/OpenJobInstanceServiceImpl.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/OpenJobInstanceServiceImpl.java new file mode 100644 index 00000000..d49ba526 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/OpenJobInstanceServiceImpl.java @@ -0,0 +1,42 @@ +package io.openjob.server.openapi.service.impl; + +import io.openjob.server.cluster.dto.WorkerJobInstanceStatusReqDTO; +import io.openjob.server.cluster.dto.WorkerJobInstanceStatusRespDTO; +import io.openjob.server.cluster.dto.WorkerJobInstanceTaskBatchReqDTO; +import io.openjob.server.cluster.dto.WorkerJobInstanceTaskBatchRespDTO; +import io.openjob.server.cluster.manager.JobInstanceManager; +import io.openjob.server.common.util.BeanMapperUtil; +import io.openjob.server.openapi.request.WorkerJobInstanceStatusRequest; +import io.openjob.server.openapi.request.WorkerJobInstanceTaskBatchRequest; +import io.openjob.server.openapi.service.OpenJobInstanceService; +import io.openjob.server.openapi.vo.WorkerJobInstanceStatusVO; +import io.openjob.server.openapi.vo.WorkerJobInstanceTaskBatchVO; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Service +public class OpenJobInstanceServiceImpl implements OpenJobInstanceService { + + private final JobInstanceManager jobInstanceManager; + + @Autowired + public OpenJobInstanceServiceImpl(JobInstanceManager jobInstanceManager) { + this.jobInstanceManager = jobInstanceManager; + } + + @Override + public WorkerJobInstanceStatusVO handleStatus(WorkerJobInstanceStatusRequest statusRequest) { + WorkerJobInstanceStatusRespDTO respDTO = this.jobInstanceManager.handleInstanceStatus(BeanMapperUtil.map(statusRequest, WorkerJobInstanceStatusReqDTO.class)); + return BeanMapperUtil.map(respDTO, WorkerJobInstanceStatusVO.class); + } + + @Override + public WorkerJobInstanceTaskBatchVO handleTasks(WorkerJobInstanceTaskBatchRequest batchRequest) { + WorkerJobInstanceTaskBatchRespDTO respDTO = this.jobInstanceManager.handleInstanceTasks(BeanMapperUtil.map(batchRequest, WorkerJobInstanceTaskBatchReqDTO.class)); + return BeanMapperUtil.map(respDTO, WorkerJobInstanceTaskBatchVO.class); + } +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/OpenTaskLogServiceImpl.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/OpenTaskLogServiceImpl.java new file mode 100644 index 00000000..ad630f8d --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/OpenTaskLogServiceImpl.java @@ -0,0 +1,32 @@ +package io.openjob.server.openapi.service.impl; + +import io.openjob.server.cluster.dto.WorkerJobInstanceTaskLogReqDTO; +import io.openjob.server.cluster.dto.WorkerJobInstanceTaskLogRespDTO; +import io.openjob.server.cluster.manager.JobInstanceTaskLogManager; +import io.openjob.server.common.util.BeanMapperUtil; +import io.openjob.server.openapi.request.WorkerJobInstanceTaskLogRequest; +import io.openjob.server.openapi.service.OpenTaskLogService; +import io.openjob.server.openapi.vo.WorkerJobInstanceTaskLogVO; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Service +public class OpenTaskLogServiceImpl implements OpenTaskLogService { + + private final JobInstanceTaskLogManager jobInstanceTaskLogManager; + + @Autowired + public OpenTaskLogServiceImpl(JobInstanceTaskLogManager jobInstanceTaskLogManager) { + this.jobInstanceTaskLogManager = jobInstanceTaskLogManager; + } + + @Override + public WorkerJobInstanceTaskLogVO batchAdd(WorkerJobInstanceTaskLogRequest logRequest) { + WorkerJobInstanceTaskLogRespDTO respDTO = this.jobInstanceTaskLogManager.handleInstanceTaskLog(BeanMapperUtil.map(logRequest, WorkerJobInstanceTaskLogReqDTO.class)); + return BeanMapperUtil.map(respDTO, WorkerJobInstanceTaskLogVO.class); + } +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/OpenWorkerServiceImpl.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/OpenWorkerServiceImpl.java new file mode 100644 index 00000000..ae988f00 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/service/impl/OpenWorkerServiceImpl.java @@ -0,0 +1,58 @@ +package io.openjob.server.openapi.service.impl; + +import io.openjob.server.cluster.dto.WorkerHeartbeatReqDTO; +import io.openjob.server.cluster.dto.WorkerHeartbeatRespDTO; +import io.openjob.server.cluster.dto.WorkerStartReqDTO; +import io.openjob.server.cluster.dto.WorkerStartRespDTO; +import io.openjob.server.cluster.dto.WorkerStopReqDTO; +import io.openjob.server.cluster.dto.WorkerStopRespDTO; +import io.openjob.server.cluster.manager.WorkerHeartbeatManager; +import io.openjob.server.cluster.manager.WorkerManager; +import io.openjob.server.common.util.BeanMapperUtil; +import io.openjob.server.openapi.request.WorkerHeartbeatRequest; +import io.openjob.server.openapi.request.WorkerStartRequest; +import io.openjob.server.openapi.request.WorkerStopRequest; +import io.openjob.server.openapi.service.OpenWorkerService; +import io.openjob.server.openapi.vo.ServerHeartbeatVO; +import io.openjob.server.openapi.vo.ServerWorkerStartVO; +import io.openjob.server.openapi.vo.ServerWorkerStopVO; +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * @author zhenghongyang sakuraovq@gmail.com + * @since 1.0.0 + */ +@Service +@Log4j2 +public class OpenWorkerServiceImpl implements OpenWorkerService { + private final WorkerManager workerManager; + private final WorkerHeartbeatManager workerHeartbeatManager; + + @Autowired + public OpenWorkerServiceImpl(WorkerManager workerManager, WorkerHeartbeatManager workerHeartbeatManager) { + this.workerManager = workerManager; + this.workerHeartbeatManager = workerHeartbeatManager; + } + + @Override + public ServerWorkerStartVO workerStart(WorkerStartRequest startRequest) { + WorkerStartReqDTO reqDTO = BeanMapperUtil.map(startRequest, WorkerStartReqDTO.class); + WorkerStartRespDTO workerStartRespDTO = this.workerManager.workerStart(reqDTO); + return BeanMapperUtil.map(workerStartRespDTO, ServerWorkerStartVO.class); + } + + @Override + public ServerWorkerStopVO workerStop(WorkerStopRequest stopReq) { + WorkerStopReqDTO reqDTO = BeanMapperUtil.map(stopReq, WorkerStopReqDTO.class); + WorkerStopRespDTO workerStopRespDTO = this.workerManager.workerStop(reqDTO); + return BeanMapperUtil.map(workerStopRespDTO, ServerWorkerStopVO.class); + } + + @Override + public ServerHeartbeatVO workerHeartbeat(WorkerHeartbeatRequest workerHeartbeatRequest) { + WorkerHeartbeatRespDTO workerHeartbeatRespDTO = this.workerHeartbeatManager.workerHeartbeat(BeanMapperUtil.map(workerHeartbeatRequest, WorkerHeartbeatReqDTO.class)); + return BeanMapperUtil.map(workerHeartbeatRespDTO, ServerHeartbeatVO.class); + } +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/DelayInstanceStatusVO.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/DelayInstanceStatusVO.java new file mode 100644 index 00000000..534bfe8c --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/DelayInstanceStatusVO.java @@ -0,0 +1,8 @@ +package io.openjob.server.openapi.vo; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +public class DelayInstanceStatusVO { +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/DelayPullInstanceVO.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/DelayPullInstanceVO.java new file mode 100644 index 00000000..27238f8e --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/DelayPullInstanceVO.java @@ -0,0 +1,32 @@ +package io.openjob.server.openapi.vo; + +import lombok.Data; + +import java.util.List; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class DelayPullInstanceVO { + private List list; + + @Data + public static class DelayPullInstanceItemVO { + private String topic; + private String taskId; + private Long delayId; + private Long delayPid; + private String delayParams; + private String delayExtra; + private String processorInfo; + private Integer failRetryTimes; + private Integer failRetryInterval; + private Integer executeTimeout; + private Integer blockingSize; + private Integer concurrency; + private Integer failTopicEnable; + private Integer failTopicConcurrency; + } +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/DelayPullTopicVO.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/DelayPullTopicVO.java new file mode 100644 index 00000000..a002ddd8 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/DelayPullTopicVO.java @@ -0,0 +1,28 @@ +package io.openjob.server.openapi.vo; + +import lombok.Data; + +import java.util.List; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class DelayPullTopicVO { + private List topicList; + + @Data + public static class DelayTopicVO { + private Long id; + private Long pid; + private String topic; + private String processorInfo; + private Integer failRetryTimes; + private Integer failRetryInterval; + private Integer executeTimeout; + private Integer concurrency; + private Integer failTopicEnable; + private Integer failTopicConcurrency; + } +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/JobInstanceVO.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/JobInstanceVO.java new file mode 100644 index 00000000..4480a042 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/JobInstanceVO.java @@ -0,0 +1,18 @@ +package io.openjob.server.openapi.vo; + +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +/** + * @author zhenghongyang sakuraovq@gmail.com + * @since 1.0.0 + */ +@Data +public class JobInstanceVO { + + @ApiModelProperty(value = "Job Name", required = true) + private String jobName; + + @ApiModelProperty(value = "Data") + private String data; +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/ServerHeartbeatVO.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/ServerHeartbeatVO.java new file mode 100644 index 00000000..65cb8cbe --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/ServerHeartbeatVO.java @@ -0,0 +1,23 @@ +package io.openjob.server.openapi.vo; + +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.util.Set; + +/** + * @author zhenghongyang sakuraovq@gmail.com + * @since 1.0.0 + */ +@Data +public class ServerHeartbeatVO { + + @ApiModelProperty("Worker address list.") + private Set workerAddressList; + + @ApiModelProperty("Cluster version") + private Long clusterVersion; + + @ApiModelProperty("Cluster delay version") + private Long clusterDelayVersion; +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/ServerSubmitJobInstanceVO.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/ServerSubmitJobInstanceVO.java new file mode 100644 index 00000000..cc9f4f08 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/ServerSubmitJobInstanceVO.java @@ -0,0 +1,11 @@ +package io.openjob.server.openapi.vo; + +import lombok.Data; + +/** + * @author stelin swoft@qq.com + * @since 1.0.0 + */ +@Data +public class ServerSubmitJobInstanceVO { +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/ServerWorkerStartVO.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/ServerWorkerStartVO.java new file mode 100644 index 00000000..1e4005b0 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/ServerWorkerStartVO.java @@ -0,0 +1,23 @@ +package io.openjob.server.openapi.vo; + +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.util.Set; + +/** + * @author zhenghongyang sakuraovq@gmail.com + * @since 1.0.0 + */ +@Data +public class ServerWorkerStartVO { + + @ApiModelProperty(value = "App id", required = true) + private Long appId; + + @ApiModelProperty(value = "App name", required = true) + private String appName; + + @ApiModelProperty(value = "Online workers and exclude start worker.", required = true) + private Set workerAddressList; +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/ServerWorkerStopVO.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/ServerWorkerStopVO.java new file mode 100644 index 00000000..cf7ad924 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/ServerWorkerStopVO.java @@ -0,0 +1,11 @@ +package io.openjob.server.openapi.vo; + +import lombok.Data; + +/** + * @author zhenghongyang sakuraovq@gmail.com + * @since 1.0.0 + */ +@Data +public class ServerWorkerStopVO { +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/WorkerJobInstanceStatusVO.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/WorkerJobInstanceStatusVO.java new file mode 100644 index 00000000..03f7c2a7 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/WorkerJobInstanceStatusVO.java @@ -0,0 +1,11 @@ +package io.openjob.server.openapi.vo; + +import lombok.Data; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class WorkerJobInstanceStatusVO { +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/WorkerJobInstanceTaskBatchVO.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/WorkerJobInstanceTaskBatchVO.java new file mode 100644 index 00000000..cfd039c1 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/WorkerJobInstanceTaskBatchVO.java @@ -0,0 +1,11 @@ +package io.openjob.server.openapi.vo; + +import lombok.Data; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class WorkerJobInstanceTaskBatchVO { +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/WorkerJobInstanceTaskLogVO.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/WorkerJobInstanceTaskLogVO.java new file mode 100644 index 00000000..28282687 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/WorkerJobInstanceTaskLogVO.java @@ -0,0 +1,11 @@ +package io.openjob.server.openapi.vo; + +import lombok.Data; + +/** + * @author zhenghongyang sakuraovq@gmail.com + * @since 1.0.0 + */ +@Data +public class WorkerJobInstanceTaskLogVO { +} diff --git a/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/WorkerJobVO.java b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/WorkerJobVO.java new file mode 100644 index 00000000..034276e5 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/main/java/io/openjob/server/openapi/vo/WorkerJobVO.java @@ -0,0 +1,15 @@ +package io.openjob.server.openapi.vo; + +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +/** + * @author zhenghongyang sakuraovq@gmail.com + * @since 1.0.0 + */ +@Data +public class WorkerJobVO { + + @ApiModelProperty(value = "Delivery id.", required = true) + private Long deliveryId; +} diff --git a/openjob-server/openjob-server-openapi/src/test/httptest/worker.http b/openjob-server/openjob-server-openapi/src/test/httptest/worker.http new file mode 100644 index 00000000..441cc311 --- /dev/null +++ b/openjob-server/openjob-server-openapi/src/test/httptest/worker.http @@ -0,0 +1,35 @@ +### Worker start +POST http://localhost:8080/openapi/worker/start +Content-Type: application/json + +{ + "address": "127.0.0.2:8088", + "appName": "openjob", + "metric": {}, + "version": "1.0.6", + "workerKey": "SZ2C7FNQRkktQdv" +} + +### Worker start +POST http://localhost:8080/openapi/worker/stop +Content-Type: application/json + +{ + "address": "127.0.0.2:8088", + "appName": "openjob", + "workerKey": "SZ2C7FNQRkktQdv" +} + + +### Worker heartbeat +POST http://localhost:8080/openapi/worker/heartbeat +Content-Type: application/json + +{ + "address": "127.0.0.2:8088", + "appId": 1, + "appName": "openjob", + "runningJobInstanceIds": [ + ], + "version": "1.0.6" +} \ No newline at end of file diff --git a/openjob-server/openjob-server-repository/pom.xml b/openjob-server/openjob-server-repository/pom.xml index e04dddd1..3823f476 100644 --- a/openjob-server/openjob-server-repository/pom.xml +++ b/openjob-server/openjob-server-repository/pom.xml @@ -5,7 +5,7 @@ openjob-server io.openjob - 1.0.7 + 1.0.8 4.0.0 openjob-server-repository diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/entity/Worker.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/entity/Worker.java index f9553636..09cc8a76 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/entity/Worker.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/entity/Worker.java @@ -23,7 +23,7 @@ public class Worker { @Id @Column(name = "`id`") @GeneratedValue(strategy = GenerationType.AUTO, generator = "native") - @GenericGenerator(name = "native", strategy = "native",parameters = {@Parameter(name = "sequence_name", value = "worker_id")}) + @GenericGenerator(name = "native", strategy = "native", parameters = {@Parameter(name = "sequence_name", value = "worker_id")}) private Long id; /** diff --git a/openjob-server/openjob-server-scheduler/pom.xml b/openjob-server/openjob-server-scheduler/pom.xml index c09c9833..fef9e9bf 100644 --- a/openjob-server/openjob-server-scheduler/pom.xml +++ b/openjob-server/openjob-server-scheduler/pom.xml @@ -5,7 +5,7 @@ openjob-server io.openjob - 1.0.7 + 1.0.8 4.0.0 diff --git a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/dto/WorkerDelayInstanceStatusDTO.java b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/dto/WorkerDelayInstanceStatusDTO.java new file mode 100644 index 00000000..4321fe59 --- /dev/null +++ b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/dto/WorkerDelayInstanceStatusDTO.java @@ -0,0 +1,66 @@ +package io.openjob.server.scheduler.dto; + +import io.openjob.common.constant.FailStatusEnum; +import lombok.Data; + +import java.util.List; + +/** + * @author stelin swoft@qq.com + * @since 1.0.8 + */ +@Data +public class WorkerDelayInstanceStatusDTO { + + private List taskList; + + @Data + public static class WorkerDelayInstanceTaskDTO { + /** + * Topic + */ + private String topic; + + /** + * Delay id. + */ + private Long delayId; + + /** + * Delay pid + */ + private Long delayPid; + + /** + * Task id. + */ + private String taskId; + + /** + * Task status. + */ + private Integer status; + + /** + * Fail status + * + * @see FailStatusEnum#getStatus() + */ + private Integer failStatus; + + /** + * Task result. + */ + private String result; + + /** + * Worker address + */ + private String workerAddress; + + /** + * Complete time + */ + private Long completeTime; + } +} diff --git a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/service/SchedulerTimerService.java b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/service/SchedulerTimerService.java index 9cfc41a8..034c66dd 100644 --- a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/service/SchedulerTimerService.java +++ b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/service/SchedulerTimerService.java @@ -79,26 +79,7 @@ public void run(SchedulerTimerTask task) { */ public void doRun(SchedulerTimerTask task, Set failoverList) { Long dispatchVersion = DateUtil.milliLongTime(); - ServerSubmitJobInstanceRequest submitReq = new ServerSubmitJobInstanceRequest(); - submitReq.setJobId(task.getJobId()); - submitReq.setJobInstanceId(task.getTaskId()); - submitReq.setCircleId(task.getCircleId()); - submitReq.setDispatchVersion(dispatchVersion); - submitReq.setJobParamType(task.getJobParamType()); - submitReq.setJobParams(task.getJobParams()); - submitReq.setJobExtendParamsType(task.getJobExtendParamsType()); - submitReq.setJobExtendParams(task.getJobExtendParams()); - submitReq.setWorkflowId(task.getWorkflowId()); - submitReq.setProcessorType(task.getProcessorType()); - submitReq.setProcessorInfo(task.getProcessorInfo()); - submitReq.setExecuteType(task.getExecuteType()); - submitReq.setFailRetryTimes(task.getFailRetryTimes()); - submitReq.setFailRetryInterval(task.getFailRetryInterval()); - submitReq.setConcurrency(task.getConcurrency()); - submitReq.setTimeExpressionType(task.getTimeExpressionType()); - submitReq.setTimeExpression(task.getTimeExpression()); - submitReq.setExecuteTimeout(task.getExecuteTimeout()); - submitReq.setExecuteOnce(Optional.ofNullable(task.getExecuteOnce()).orElse(CommonConstant.NO)); + ServerSubmitJobInstanceRequest submitReq = this.getServerSubmitJobInstanceRequest(task, dispatchVersion); WorkerDTO workerDTO = WorkerUtil.selectWorkerByAppId(task.getAppid(), failoverList); if (Objects.isNull(workerDTO)) { @@ -202,4 +183,28 @@ private void addInstanceLog(Long jobId, Long instanceId, String message) { jobInstanceLog.setUpdateTime(timestamp); this.jobInstanceLogDAO.save(jobInstanceLog); } + + private ServerSubmitJobInstanceRequest getServerSubmitJobInstanceRequest(SchedulerTimerTask task, Long dispatchVersion) { + ServerSubmitJobInstanceRequest submitReq = new ServerSubmitJobInstanceRequest(); + submitReq.setJobId(task.getJobId()); + submitReq.setJobInstanceId(task.getTaskId()); + submitReq.setCircleId(task.getCircleId()); + submitReq.setDispatchVersion(dispatchVersion); + submitReq.setJobParamType(task.getJobParamType()); + submitReq.setJobParams(task.getJobParams()); + submitReq.setJobExtendParamsType(task.getJobExtendParamsType()); + submitReq.setJobExtendParams(task.getJobExtendParams()); + submitReq.setWorkflowId(task.getWorkflowId()); + submitReq.setProcessorType(task.getProcessorType()); + submitReq.setProcessorInfo(task.getProcessorInfo()); + submitReq.setExecuteType(task.getExecuteType()); + submitReq.setFailRetryTimes(task.getFailRetryTimes()); + submitReq.setFailRetryInterval(task.getFailRetryInterval()); + submitReq.setConcurrency(task.getConcurrency()); + submitReq.setTimeExpressionType(task.getTimeExpressionType()); + submitReq.setTimeExpression(task.getTimeExpression()); + submitReq.setExecuteTimeout(task.getExecuteTimeout()); + submitReq.setExecuteOnce(Optional.ofNullable(task.getExecuteOnce()).orElse(CommonConstant.NO)); + return submitReq; + } } diff --git a/openjob-server/openjob-server-starter/pom.xml b/openjob-server/openjob-server-starter/pom.xml index 3f751870..ea0c105c 100644 --- a/openjob-server/openjob-server-starter/pom.xml +++ b/openjob-server/openjob-server-starter/pom.xml @@ -5,7 +5,7 @@ openjob-server io.openjob - 1.0.7 + 1.0.8 4.0.0 diff --git a/openjob-server/openjob-server-starter/src/main/resources/application.properties b/openjob-server/openjob-server-starter/src/main/resources/application.properties index 4798332d..0f4ad9ea 100644 --- a/openjob-server/openjob-server-starter/src/main/resources/application.properties +++ b/openjob-server/openjob-server-starter/src/main/resources/application.properties @@ -1,5 +1,5 @@ server.port=${SERVER_PORT:8080} -#spring.profiles.active=mysql +spring.profiles.active=mysql #spring.profiles.active=pgsql #spring.profiles.active=oracle #spring.profiles.active=tidb diff --git a/openjob-server/openjob-server-starter/src/main/resources/banner.txt b/openjob-server/openjob-server-starter/src/main/resources/banner.txt index b1595b74..59988370 100644 --- a/openjob-server/openjob-server-starter/src/main/resources/banner.txt +++ b/openjob-server/openjob-server-starter/src/main/resources/banner.txt @@ -5,7 +5,7 @@ ${AnsiColor.GREEN} | |_| | | |_) | | __/ | | | | | | | (_) | | |_) | \___/ | .__/ \___| |_| |_| _/ | \___/ |_.__/ |_| |__/ - :: Version :: Openjob Server(v1.0.7) Spring Boot(v${spring-boot.version}) Akka(v2.6.19) + :: Version :: Openjob Server(v1.0.8) Spring Boot(v${spring-boot.version}) Akka(v2.6.19) :: Website :: https://openjob.io :: Author :: https://github.com/stelin :: Github :: https://github.com/open-job/openjob diff --git a/openjob-server/pom.xml b/openjob-server/pom.xml index 2387748a..130341dd 100644 --- a/openjob-server/pom.xml +++ b/openjob-server/pom.xml @@ -5,7 +5,7 @@ openjob io.openjob - 1.0.7 + 1.0.8 4.0.0 openjob-server diff --git a/openjob-worker/openjob-worker-core/pom.xml b/openjob-worker/openjob-worker-core/pom.xml index f5c8249a..8b88c4fd 100644 --- a/openjob-worker/openjob-worker-core/pom.xml +++ b/openjob-worker/openjob-worker-core/pom.xml @@ -5,7 +5,7 @@ openjob-worker io.openjob.worker - 1.0.7 + 1.0.8 4.0.0 diff --git a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/processor/TaskResult.java b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/processor/TaskResult.java index 9b7b52f5..d9406987 100644 --- a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/processor/TaskResult.java +++ b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/processor/TaskResult.java @@ -1,6 +1,5 @@ package io.openjob.worker.processor; -import io.openjob.common.constant.TaskStatusEnum; import lombok.Data; /** diff --git a/openjob-worker/openjob-worker-spring-boot-starter/pom.xml b/openjob-worker/openjob-worker-spring-boot-starter/pom.xml index 450f4205..80d61519 100644 --- a/openjob-worker/openjob-worker-spring-boot-starter/pom.xml +++ b/openjob-worker/openjob-worker-spring-boot-starter/pom.xml @@ -5,7 +5,7 @@ openjob-worker io.openjob.worker - 1.0.7 + 1.0.8 4.0.0 diff --git a/openjob-worker/pom.xml b/openjob-worker/pom.xml index 2f73ed56..48552fdf 100644 --- a/openjob-worker/pom.xml +++ b/openjob-worker/pom.xml @@ -5,7 +5,7 @@ openjob io.openjob - 1.0.7 + 1.0.8 4.0.0 @@ -25,7 +25,7 @@ io.openjob.worker openjob-worker-core - 1.0.7 + 1.0.8 diff --git a/pom.xml b/pom.xml index 7370ecb1..302ced1c 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ io.openjob openjob pom - 1.0.7 + 1.0.8 Openjob Build ${version} Openjob build with Maven