From 3770968bdb4debed6fe6678e7e81b8bafb4d5ccd Mon Sep 17 00:00:00 2001 From: stelin <794774870@qq.com> Date: Thu, 18 May 2023 10:36:36 +0800 Subject: [PATCH 01/13] :bug:fixed add and update null --- .../server/repository/dao/impl/JobDAOImpl.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobDAOImpl.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobDAOImpl.java index 700cb7bd..143b2301 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobDAOImpl.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobDAOImpl.java @@ -43,7 +43,12 @@ public Long save(Job job) { job.setDeleteTime(0L); job.setCreateTime(timestamp); job.setUpdateTime(timestamp); - job.setNextExecuteTime(0L); + + //Save next execute time + if (Objects.isNull(job.getNextExecuteTime())) { + job.setNextExecuteTime(0L); + } + job.setSlotsId(SlotsUtil.getSlotsId(String.valueOf(timestamp))); job.setWorkflowId(0L); return jobRepository.save(job).getId(); @@ -75,7 +80,12 @@ public Long update(Job job) { j.setParamsType(job.getParamsType()); j.setExtendParamsType(job.getExtendParamsType()); j.setExtendParams(job.getExtendParams()); - j.setNextExecuteTime(job.getNextExecuteTime()); + + // Update next execute time. + if (Objects.nonNull(job.getNextExecuteTime())){ + j.setNextExecuteTime(job.getNextExecuteTime()); + } + j.setFailRetryInterval(job.getFailRetryInterval()); j.setFailRetryTimes(job.getFailRetryTimes()); j.setConcurrency(job.getConcurrency()); From cdbf7e99d4fd74e0ded3195c795417739b3ee640 Mon Sep 17 00:00:00 2001 From: stelin <794774870@qq.com> Date: Thu, 18 May 2023 15:07:10 +0800 Subject: [PATCH 02/13] :bug:fixed delay total count bug --- .../server/repository/dao/impl/DelayInstanceDAOImpl.java | 2 +- .../repository/repository/DelayInstanceRepository.java | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayInstanceDAOImpl.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayInstanceDAOImpl.java index 5236f5d8..ab655064 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayInstanceDAOImpl.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayInstanceDAOImpl.java @@ -178,7 +178,7 @@ public DelayInstance getByTaskId(String taskId) { @Override public List getTopicTotalCount(List topics, List statuses) { - return this.delayInstanceRepository.getDelayTotalCount(topics, statuses); + return this.delayInstanceRepository.getDelayTotalCount(topics, statuses, CommonConstant.NO); } @Override diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayInstanceRepository.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayInstanceRepository.java index 0416f503..5165048b 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayInstanceRepository.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayInstanceRepository.java @@ -34,10 +34,11 @@ public interface DelayInstanceRepository extends JpaRepository getDelayTotalCount(List topics, List statuses); + @Query(value = "SELECT new io.openjob.server.repository.dto.DelayInstanceTotalDTO(d.topic, count(d.id)) from DelayInstance as d where d.topic in (?1) and d.status in (?2) and d.deleted=?3 group by d.topic") + List getDelayTotalCount(List topics, List statuses, Integer deleted); /** * Find by task id. From 1193c3f50c376b104fee74dc1a8981490d486fef Mon Sep 17 00:00:00 2001 From: stelin <794774870@qq.com> Date: Thu, 18 May 2023 16:16:59 +0800 Subject: [PATCH 03/13] :bug:fixed stop delay instance bug --- .../admin/service/impl/DelayInstanceServiceImpl.java | 2 ++ .../server/repository/dao/DelayInstanceDAO.java | 9 +++++++++ .../repository/dao/impl/DelayInstanceDAOImpl.java | 5 +++++ .../server/repository/entity/DelayInstance.java | 3 +++ .../repository/repository/DelayInstanceRepository.java | 10 ++++++++++ .../scheduler/scheduler/DelayInstanceScheduler.java | 8 ++++++++ 6 files changed, 37 insertions(+) diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayInstanceServiceImpl.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayInstanceServiceImpl.java index c402cf9a..41a665e9 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayInstanceServiceImpl.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayInstanceServiceImpl.java @@ -77,6 +77,8 @@ public StopDelayInstanceVO stop(StopDelayInstanceRequest request) { DelayInstanceStopResponseDTO stop = this.delayInstanceScheduler.stop(delayInstanceStopRequestDTO); // Update status + this.delayInstanceDAO.updateStatus(request.getTaskId(), TaskStatusEnum.STOP.getStatus()); + StopDelayInstanceVO stopDelayInstanceVO = new StopDelayInstanceVO(); stopDelayInstanceVO.setResult(stop.getResult()); return stopDelayInstanceVO; diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/DelayInstanceDAO.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/DelayInstanceDAO.java index f6c30fae..22590450 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/DelayInstanceDAO.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/DelayInstanceDAO.java @@ -64,6 +64,15 @@ public interface DelayInstanceDAO { */ PageDTO pageList(DelayInstancePageDTO instancePageDTO); + /** + * Update status + * + * @param taskId taskId + * @param status status + * @return + */ + Integer updateStatus(String taskId, Integer status); + /** * Batch update status. * diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayInstanceDAOImpl.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayInstanceDAOImpl.java index ab655064..c1162e58 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayInstanceDAOImpl.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayInstanceDAOImpl.java @@ -181,6 +181,11 @@ public List getTopicTotalCount(List topics, List< return this.delayInstanceRepository.getDelayTotalCount(topics, statuses, CommonConstant.NO); } + @Override + public Integer updateStatus(String taskId, Integer status) { + return this.delayInstanceRepository.updateStatusByTaskId(taskId, status); + } + @Override public Integer batchUpdateStatus(List updateList) { // When then sql. diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/entity/DelayInstance.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/entity/DelayInstance.java index 6290e75c..10a5046d 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/entity/DelayInstance.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/entity/DelayInstance.java @@ -43,6 +43,9 @@ public class DelayInstance { @Column(name = "delay_extra") private String delayExtra; + /** + * @see io.openjob.common.constant.TaskStatusEnum + */ @Column(name = "status") private Integer status; diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayInstanceRepository.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayInstanceRepository.java index 5165048b..ae810388 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayInstanceRepository.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayInstanceRepository.java @@ -47,4 +47,14 @@ public interface DelayInstanceRepository extends JpaRepository Date: Thu, 18 May 2023 17:37:24 +0800 Subject: [PATCH 04/13] :zap:update worker init --- .../java/io/openjob/worker/OpenjobWorker.java | 63 ++++++++++++++----- .../io/openjob/worker/delay/DelayManager.java | 14 +++++ .../worker/init/WorkerActorSystem.java | 14 +++++ .../io/openjob/worker/init/WorkerConfig.java | 14 +++++ .../openjob/worker/init/WorkerHeartbeat.java | 17 ++++- .../openjob/worker/init/WorkerRegister.java | 16 +++++ .../openjob/worker/init/WorkerShutdown.java | 20 ++++-- 7 files changed, 137 insertions(+), 21 deletions(-) diff --git a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/OpenjobWorker.java b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/OpenjobWorker.java index e79dca48..ad4f099e 100644 --- a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/OpenjobWorker.java +++ b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/OpenjobWorker.java @@ -1,16 +1,23 @@ package io.openjob.worker; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.openjob.worker.config.OpenjobConfig; +import io.openjob.worker.constant.WorkerConstant; import io.openjob.worker.delay.DelayManager; import io.openjob.worker.init.WorkerActorSystem; import io.openjob.worker.init.WorkerConfig; -import io.openjob.worker.init.WorkerHeartbeat; import io.openjob.worker.init.WorkerContext; +import io.openjob.worker.init.WorkerHeartbeat; import io.openjob.worker.init.WorkerRegister; import io.openjob.worker.init.WorkerShutdown; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + /** * @author stelin swoft@qq.com * @since 1.0.0 @@ -45,7 +52,7 @@ public void afterPropertiesSet() { try { this.init(); } catch (Throwable throwable) { - log.error("Openjob worker initialize failed!", throwable); + log.error("Openjob worker after properties initialize failed!", throwable); } } @@ -55,22 +62,50 @@ public void afterPropertiesSet() { * @throws Exception Exception */ public synchronized void init() throws Exception { - // Initialize worker config. - this.workerConfig.init(); + // Retry interval + int retryInterval = OpenjobConfig.getInteger(WorkerConstant.WORKER_HEARTBEAT_INTERVAL, WorkerConstant.DEFAULT_WORKER_HEARTBEAT_INTERVAL); + + // Initialize executor + ScheduledThreadPoolExecutor initializeExecutor = new ScheduledThreadPoolExecutor( + 1, + new ThreadFactoryBuilder().setNameFormat("Openjob-initialize-thread").build(), + new ThreadPoolExecutor.AbortPolicy() + ); + + //Initialize continuously until complete + initializeExecutor.scheduleWithFixedDelay(() -> { + // Initialize complete + if (this.doInitialize()) { + initializeExecutor.shutdown(); + log.info("Openjob worker initialize complete!"); + } + }, 0, retryInterval, TimeUnit.SECONDS); + } + + public synchronized Boolean doInitialize() { + try { + // Initialize worker config. + this.workerConfig.init(); - // Initialize actor system. - this.workerActorSystem.init(); + // Initialize actor system. + this.workerActorSystem.init(); - // Register worker. - this.workerRegister.register(); + // Register worker. + this.workerRegister.register(); - // Initialize worker heartbeat. - this.workerHeartbeat.init(); + // Initialize worker heartbeat. + this.workerHeartbeat.init(); - // Initialize delay. - this.delayManager.init(); + // Initialize delay. + this.delayManager.init(); - // Initialize shutdown. - this.workerShutdown.init(); + // Initialize shutdown. + this.workerShutdown.init(); + + return true; + } catch (Throwable ex) { + log.error("Openjob worker initialize failed!", ex); + return false; + } } } diff --git a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/delay/DelayManager.java b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/delay/DelayManager.java index 2294dff8..4ca9fa57 100644 --- a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/delay/DelayManager.java +++ b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/delay/DelayManager.java @@ -4,6 +4,7 @@ import io.openjob.worker.init.WorkerConfig; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** @@ -23,6 +24,11 @@ public class DelayManager { */ private final AtomicLong delayVersion = new AtomicLong(0); + /** + * Initialize status + */ + private final AtomicBoolean isInit = new AtomicBoolean(false); + /** * Delay starter. */ @@ -34,12 +40,20 @@ public DelayManager() { * Init */ public void init() { + // Already initialized + if (this.isInit.get()) { + return; + } + if (WorkerConfig.getDelayEnable()) { this.delayTaskMaster.init(); // Delay task manager. DelayTaskManager.INSTANCE.init(); } + + // Initialized + this.isInit.set(true); } /** diff --git a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerActorSystem.java b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerActorSystem.java index 483a147a..c57f0471 100644 --- a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerActorSystem.java +++ b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerActorSystem.java @@ -19,6 +19,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; /** * @author stelin swoft@qq.com @@ -37,10 +38,20 @@ public class WorkerActorSystem { */ private static ActorRef persistentRoutingRef; + /** + * Initialize status + */ + private final AtomicBoolean isInit = new AtomicBoolean(false); + /** * Init */ public void init() { + // Already initialized + if (this.isInit.get()) { + return; + } + String akkaConfigFile = OpenjobConfig.getString(WorkerConstant.WORKER_AKKA_CONFIG_FILE, WorkerConstant.DEFAULT_WORKER_AKKA_CONFIG_FILENAME); Config defaultConfig = ConfigFactory.load(akkaConfigFile); Map newConfig = new HashMap<>(16); @@ -88,6 +99,9 @@ public void init() { .withDispatcher(WorkerAkkaConstant.DISPATCHER_DELAY_MASTER); actorSystem.actorOf(delayMasterProps, AkkaConstant.WORKER_ACTOR_DELAY_MASTER); } + + // Initialized + this.isInit.set(true); } public static ActorSystem getActorSystem() { diff --git a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerConfig.java b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerConfig.java index d484fe1c..1d45f3e1 100644 --- a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerConfig.java +++ b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerConfig.java @@ -6,6 +6,7 @@ import lombok.Getter; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; /** * @author stelin swoft@qq.com @@ -49,10 +50,20 @@ public class WorkerConfig { */ private static Integer serverPort; + /** + * Initialize status + */ + private final AtomicBoolean isInit = new AtomicBoolean(false); + /** * Init */ public void init() { + // Already initialized + if (this.isInit.get()) { + return; + } + // App name. appName = OpenjobConfig.getString(WorkerConstant.WORKER_APP_NAME); if (Objects.isNull(appName)) { @@ -65,6 +76,9 @@ public void init() { delayEnable = OpenjobConfig.getBoolean(WorkerConstant.WORKER_DELAY_ENABLE, false); serverHost = OpenjobConfig.getString(WorkerConstant.SERVER_HOST, IpUtil.getLocalAddress()); serverPort = OpenjobConfig.getInteger(WorkerConstant.SERVER_PORT, WorkerConstant.DEFAULT_SERVER_PORT); + + // Initialized + this.isInit.set(true); } public static String getWorkerHost() { diff --git a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerHeartbeat.java b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerHeartbeat.java index 55c72777..5fc164d7 100644 --- a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerHeartbeat.java +++ b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerHeartbeat.java @@ -1,6 +1,5 @@ package io.openjob.worker.init; -import ch.qos.logback.core.util.ContextUtil; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.openjob.common.request.WorkerHeartbeatRequest; import io.openjob.common.response.ServerHeartbeatResponse; @@ -13,15 +12,14 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; -import java.util.ArrayList; import java.util.Comparator; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** @@ -38,6 +36,11 @@ public class WorkerHeartbeat { */ private final ScheduledExecutorService heartbeatService; + /** + * Initialize status + */ + private final AtomicBoolean isInit = new AtomicBoolean(false); + /** * New WorkerHeartbeat * @@ -56,6 +59,11 @@ public WorkerHeartbeat(OpenjobWorker openjobWorker) { * Init */ public void init() { + // Already initialized + if (this.isInit.get()) { + return; + } + int heartbeatInterval = OpenjobConfig.getInteger(WorkerConstant.WORKER_HEARTBEAT_INTERVAL, WorkerConstant.DEFAULT_WORKER_HEARTBEAT_INTERVAL); heartbeatService.scheduleAtFixedRate(() -> { String workerAddress = WorkerConfig.getWorkerAddress(); @@ -78,6 +86,9 @@ public void init() { } }, 5, heartbeatInterval, TimeUnit.SECONDS); + + // Initialized + this.isInit.set(true); } /** diff --git a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerRegister.java b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerRegister.java index 1a7a0fbf..400e2249 100644 --- a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerRegister.java +++ b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerRegister.java @@ -8,6 +8,8 @@ import io.openjob.worker.util.WorkerUtil; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.atomic.AtomicBoolean; + /** * @author stelin swoft@qq.com * @since 1.0.0 @@ -17,14 +19,25 @@ public class WorkerRegister { private final OpenjobWorker openjobWorker; + /** + * Initialize status + */ + private final AtomicBoolean isInit = new AtomicBoolean(false); + public WorkerRegister(OpenjobWorker openjobWorker) { this.openjobWorker = openjobWorker; } + /** * Register */ public void register() { + // Already initialized + if (this.isInit.get()) { + return; + } + String serverAddress = WorkerConfig.getServerHost(); WorkerStartRequest startReq = new WorkerStartRequest(); @@ -38,6 +51,9 @@ public void register() { // Do register. this.doRegister(response); + + // Initialized + this.isInit.set(true); } catch (Throwable e) { log.error("Register worker fail. serverAddress={} workerAddress={}", serverAddress, WorkerConfig.getWorkerAddress()); throw e; diff --git a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerShutdown.java b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerShutdown.java index e2012236..cb9389b3 100644 --- a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerShutdown.java +++ b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerShutdown.java @@ -1,14 +1,14 @@ package io.openjob.worker.init; import io.openjob.common.request.WorkerStopRequest; -import io.openjob.common.response.Result; import io.openjob.common.response.ServerResponse; import io.openjob.common.util.FutureUtil; -import io.openjob.common.util.ResultUtil; import io.openjob.worker.OpenjobWorker; import io.openjob.worker.util.WorkerUtil; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.atomic.AtomicBoolean; + /** * @author stelin swoft@qq.com * @since 1.0.0 @@ -17,20 +17,32 @@ public class WorkerShutdown { private final OpenjobWorker openjobWorker; + /** + * Initialize status + */ + private final AtomicBoolean isInit = new AtomicBoolean(false); + public WorkerShutdown(OpenjobWorker openjobWorker) { this.openjobWorker = openjobWorker; } public void init() { + // Already initialized + if (this.isInit.get()) { + return; + } + Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown)); + + // Initialized + this.isInit.set(true); } /** * Openjob worker stop. * - * @throws Exception exception */ - private void stop() throws Exception { + private void stop() { String serverAddress = WorkerConfig.getServerHost(); String workerAddress = WorkerConfig.getWorkerAddress(); String appName = WorkerConfig.getAppName(); From a0524130abd806021b2734798975bf889afe1289 Mon Sep 17 00:00:00 2001 From: stelin <794774870@qq.com> Date: Thu, 18 May 2023 21:00:23 +0800 Subject: [PATCH 05/13] :art:add code --- .../server/admin/constant/CodeEnum.java | 22 +++++++++---------- .../admin/service/impl/AppServiceImpl.java | 4 ++-- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/constant/CodeEnum.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/constant/CodeEnum.java index eeed30bc..65deb96e 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/constant/CodeEnum.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/constant/CodeEnum.java @@ -5,11 +5,11 @@ import lombok.Getter; /** - * user: 100+ - * namespace: 200+ - * application: 300+ - * job: 400+ - * delay: 500+ + * User: 100+ + * Namespace: 200+ + * Application: 300+ + * Job: 400+ + * Delay: 500+ * * @author stelin swoft@qq.com * @since 1.0.0 @@ -17,12 +17,12 @@ @Getter @AllArgsConstructor public enum CodeEnum implements CodeExceptionAssert { - - /** - * App name not exist - */ - NAME_EXIST(100, "App name must be globally unique!"), - + // Code list + USER_EXIST(100, "User is exist!"), + NAMESPACE_EXIST(200, "App name must be globally unique!"), + NAMESPACE_DELETE_INVALID(201, "Namespace can not be delete!"), + APP_NAME_EXIST(300, "App name must be globally unique!"), + APP_DELETE_INVALID(301, "Application can not be deleted!"), TIME_EXPRESSION_INVALID(400, "Time expression is invalid"); /** diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/AppServiceImpl.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/AppServiceImpl.java index ba95fac8..77196bd4 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/AppServiceImpl.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/AppServiceImpl.java @@ -47,7 +47,7 @@ public AppServiceImpl(AppDAO appDAO, NamespaceDAO namespaceDAO) { @Override public AddAppVO add(AddAppRequest addRequest) { App app = this.appDAO.getAppByName(addRequest.getName()); - CodeEnum.NAME_EXIST.assertIsTrue(Objects.isNull(app)); + CodeEnum.APP_NAME_EXIST.assertIsTrue(Objects.isNull(app)); Long id = this.appDAO.save(BeanMapperUtil.map(addRequest, App.class)); @@ -61,7 +61,7 @@ public UpdateAppVO update(UpdateAppRequest updateRequest) { // App name is exist and not self! App nameApp = this.appDAO.getAppByName(updateRequest.getName()); if (Objects.nonNull(nameApp) && !nameApp.getId().equals(updateRequest.getId())) { - CodeEnum.NAME_EXIST.throwException(); + CodeEnum.APP_NAME_EXIST.throwException(); } App app = BeanMapperUtil.map(BeanMapperUtil.map(updateRequest, App.class), App.class); From c3b5661aeb91feaef8b6b27e66163b3178d433b2 Mon Sep 17 00:00:00 2001 From: stelin <794774870@qq.com> Date: Thu, 18 May 2023 21:20:24 +0800 Subject: [PATCH 06/13] :zap:add code --- .../openjob/server/admin/constant/CodeEnum.java | 16 ++++++++++++++-- .../admin/service/impl/DelayServiceImpl.java | 9 +++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/constant/CodeEnum.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/constant/CodeEnum.java index 65deb96e..ceeb7e2c 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/constant/CodeEnum.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/constant/CodeEnum.java @@ -19,11 +19,23 @@ public enum CodeEnum implements CodeExceptionAssert { // Code list USER_EXIST(100, "User is exist!"), - NAMESPACE_EXIST(200, "App name must be globally unique!"), + + // Namespace NAMESPACE_DELETE_INVALID(201, "Namespace can not be delete!"), + + // Application APP_NAME_EXIST(300, "App name must be globally unique!"), APP_DELETE_INVALID(301, "Application can not be deleted!"), - TIME_EXPRESSION_INVALID(400, "Time expression is invalid"); + + // Job + TIME_EXPRESSION_INVALID(400, "Time expression is invalid"), + JOB_DELETE_INVALID(401, "Job can not be deleted!"), + + // Delay + DELAY_TOPIC_EXIST(500, "Topic is exist!"), + DELAY_DELETE_INVALID(501, "Delay can not be deleted!"), + ; + /** * Value diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayServiceImpl.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayServiceImpl.java index 3cf7283e..21a34fb4 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayServiceImpl.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayServiceImpl.java @@ -3,6 +3,7 @@ import io.openjob.common.constant.CommonConstant; import io.openjob.common.constant.TaskStatusEnum; import io.openjob.common.util.DelayUtil; +import io.openjob.server.admin.constant.CodeEnum; import io.openjob.server.admin.request.delay.AddDelayRequest; import io.openjob.server.admin.request.delay.DeleteDelayRequest; import io.openjob.server.admin.request.delay.ListDelayRequest; @@ -65,6 +66,9 @@ public DelayServiceImpl(DelayDAO delayDAO, AppDAO appDAO, DelayInstanceDAO delay @Override @Transactional(rollbackFor = Exception.class) public AddDelayVO add(AddDelayRequest addRequest) { + Delay byTopic = this.delayDAO.findByTopic(addRequest.getTopic()); + CodeEnum.DELAY_TOPIC_EXIST.assertIsTrue(Objects.isNull(byTopic)); + // Delay Delay delay = BeanMapperUtil.map(addRequest, Delay.class); delay.setPid(0L); @@ -150,6 +154,11 @@ public DeleteDelayVO delete(DeleteDelayRequest deleteDelayRequest) { @Override @Transactional(rollbackFor = Exception.class) public UpdateDelayVO update(UpdateDelayRequest updateDelayRequest) { + Delay byTopic = this.delayDAO.findByTopic(updateDelayRequest.getTopic()); + if (Objects.nonNull(byTopic) && !byTopic.getId().equals(updateDelayRequest.getId())) { + CodeEnum.DELAY_TOPIC_EXIST.throwException(); + } + // Delay Delay delay = BeanMapperUtil.map(updateDelayRequest, Delay.class); this.delayDAO.update(delay); From 5304ac7aea720a891c00f20ac0fee9e99dc6f60c Mon Sep 17 00:00:00 2001 From: stelin <794774870@qq.com> Date: Fri, 19 May 2023 11:54:34 +0800 Subject: [PATCH 07/13] :bug:fixed delete bug --- .../admin/service/impl/AppServiceImpl.java | 24 ++++- .../admin/service/impl/DelayServiceImpl.java | 9 +- .../admin/service/impl/JobServiceImpl.java | 10 +- .../service/impl/NamespaceServiceImpl.java | 11 +- .../openjob/server/repository/dao/AppDAO.java | 23 ++++ .../server/repository/dao/DelayDAO.java | 16 +++ .../repository/dao/DelayInstanceDAO.java | 19 ++-- .../openjob/server/repository/dao/JobDAO.java | 9 ++ .../server/repository/dao/JobInstanceDAO.java | 8 ++ .../repository/dao/impl/AppDAOImpl.java | 15 +++ .../repository/dao/impl/DelayDAOImpl.java | 57 ++++++---- .../dao/impl/DelayInstanceDAOImpl.java | 100 +++++++++--------- .../repository/dao/impl/JobDAOImpl.java | 14 ++- .../dao/impl/JobInstanceDAOImpl.java | 15 ++- .../repository/repository/AppRepository.java | 9 ++ .../repository/DelayInstanceRepository.java | 9 ++ .../repository/JobInstanceRepository.java | 20 +++- 17 files changed, 270 insertions(+), 98 deletions(-) diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/AppServiceImpl.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/AppServiceImpl.java index 77196bd4..67ada701 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/AppServiceImpl.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/AppServiceImpl.java @@ -16,8 +16,12 @@ import io.openjob.server.common.util.PageUtil; import io.openjob.server.common.vo.PageVO; import io.openjob.server.repository.dao.AppDAO; +import io.openjob.server.repository.dao.DelayDAO; +import io.openjob.server.repository.dao.JobDAO; import io.openjob.server.repository.dao.NamespaceDAO; import io.openjob.server.repository.entity.App; +import io.openjob.server.repository.entity.Delay; +import io.openjob.server.repository.entity.Job; import io.openjob.server.repository.entity.Namespace; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -37,11 +41,16 @@ public class AppServiceImpl implements AppService { private final AppDAO appDAO; private final NamespaceDAO namespaceDAO; + private final JobDAO jobDAO; + private final DelayDAO delayDAO; + @Autowired - public AppServiceImpl(AppDAO appDAO, NamespaceDAO namespaceDAO) { + public AppServiceImpl(AppDAO appDAO, NamespaceDAO namespaceDAO, JobDAO jobDAO, DelayDAO delayDAO) { this.appDAO = appDAO; this.namespaceDAO = namespaceDAO; + this.jobDAO = jobDAO; + this.delayDAO = delayDAO; } @Override @@ -71,9 +80,16 @@ public UpdateAppVO update(UpdateAppRequest updateRequest) { @Override public DeleteAppVO delete(DeleteAppRequest deleteAppRequest) { - App app = BeanMapperUtil.map(deleteAppRequest, App.class); - app.setDeleted(CommonConstant.YES); - this.appDAO.update(app); + App byId = this.appDAO.getById(deleteAppRequest.getId()); + + // Job/delay/workflow + Job firstJob = this.jobDAO.getFirstByNamespaceAndAppid(byId.getNamespaceId(), byId.getId()); + Delay firstDelay = this.delayDAO.getFirstByNamespaceAndAppid(byId.getNamespaceId(), byId.getId()); + if (Objects.nonNull(firstJob) || Objects.nonNull(firstDelay)) { + CodeEnum.APP_DELETE_INVALID.throwException(); + } + + this.appDAO.deleteById(deleteAppRequest.getId()); return new DeleteAppVO(); } diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayServiceImpl.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayServiceImpl.java index 21a34fb4..65ecc7df 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayServiceImpl.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/DelayServiceImpl.java @@ -143,8 +143,13 @@ public PageVO list(ListDelayRequest listDelayRequest) { @Override @Transactional(rollbackFor = Exception.class) public DeleteDelayVO delete(DeleteDelayRequest deleteDelayRequest) { - this.delayDAO.updateStatusOrDeleted(deleteDelayRequest.getId(), null, CommonConstant.YES); - this.delayDAO.updateStatusOrDeleted(deleteDelayRequest.getCid(), null, CommonConstant.YES); + if (Objects.nonNull(this.delayInstanceDAO.getFirstByDelayId(deleteDelayRequest.getId()))) { + CodeEnum.DELAY_DELETE_INVALID.throwException(); + } + + // Delete + this.delayDAO.deleteById(deleteDelayRequest.getId()); + this.delayDAO.deleteById(deleteDelayRequest.getCid()); // Refresh delay version this.delayScheduler.refreshDelayVersion(); diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobServiceImpl.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobServiceImpl.java index 57dc3cf8..1ebbb41e 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobServiceImpl.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobServiceImpl.java @@ -28,12 +28,14 @@ import io.openjob.server.repository.constant.JobStatusEnum; import io.openjob.server.repository.dao.AppDAO; import io.openjob.server.repository.dao.JobDAO; +import io.openjob.server.repository.dao.JobInstanceDAO; import io.openjob.server.repository.dto.JobPageDTO; import io.openjob.server.repository.entity.App; import io.openjob.server.repository.entity.Job; import io.openjob.server.scheduler.dto.JobExecuteRequestDTO; import io.openjob.server.scheduler.service.JobSchedulingService; import lombok.extern.slf4j.Slf4j; +import org.aspectj.apache.bcel.classfile.Code; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; @@ -56,12 +58,14 @@ public class JobServiceImpl implements JobService { private final JobDAO jobDAO; private final AppDAO appDAO; + private final JobInstanceDAO jobInstanceDAO; private final JobSchedulingService jobSchedulingService; @Autowired - public JobServiceImpl(JobDAO jobDAO, AppDAO appDAO, JobSchedulingService jobSchedulingService) { + public JobServiceImpl(JobDAO jobDAO, AppDAO appDAO, JobInstanceDAO jobInstanceDAO, JobSchedulingService jobSchedulingService) { this.jobDAO = jobDAO; this.appDAO = appDAO; + this.jobInstanceDAO = jobInstanceDAO; this.jobSchedulingService = jobSchedulingService; } @@ -107,6 +111,10 @@ public UpdateJobVO update(UpdateJobRequest updateJobRequest) { @Override public DeleteJobVO delete(DeleteJobRequest deleteJobRequest) { + if (Objects.nonNull(this.jobInstanceDAO.getFirstByJobId(deleteJobRequest.getId()))) { + CodeEnum.JOB_DELETE_INVALID.throwException(); + } + this.jobDAO.updateByStatusOrDeleted(deleteJobRequest.getId(), null, CommonConstant.YES, null); return new DeleteJobVO(); } diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/NamespaceServiceImpl.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/NamespaceServiceImpl.java index ac977963..d0aa97a1 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/NamespaceServiceImpl.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/NamespaceServiceImpl.java @@ -1,6 +1,7 @@ package io.openjob.server.admin.service.impl; import io.openjob.common.constant.CommonConstant; +import io.openjob.server.admin.constant.CodeEnum; import io.openjob.server.admin.request.namespace.AddNamespaceRequest; import io.openjob.server.admin.request.namespace.DeleteNamespaceRequest; import io.openjob.server.admin.request.namespace.ListNamespaceRequest; @@ -14,11 +15,13 @@ import io.openjob.server.common.util.BeanMapperUtil; import io.openjob.server.common.util.PageUtil; import io.openjob.server.common.vo.PageVO; +import io.openjob.server.repository.dao.AppDAO; import io.openjob.server.repository.dao.NamespaceDAO; import io.openjob.server.repository.entity.Namespace; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.Objects; import java.util.UUID; /** @@ -28,10 +31,12 @@ @Service public class NamespaceServiceImpl implements NamespaceService { private final NamespaceDAO namespaceDAO; + private final AppDAO appDAO; @Autowired - public NamespaceServiceImpl(NamespaceDAO namespaceDAO) { + public NamespaceServiceImpl(NamespaceDAO namespaceDAO, AppDAO appDAO) { this.namespaceDAO = namespaceDAO; + this.appDAO = appDAO; } @Override @@ -53,6 +58,10 @@ public UpdateNamespaceVO update(UpdateNamespaceRequest updateRequest) { @Override public DeleteNamespaceVO delete(DeleteNamespaceRequest deleteNamespaceRequest) { + if (Objects.nonNull(this.appDAO.getFirstByNamespaceId(deleteNamespaceRequest.getId()))) { + CodeEnum.NAMESPACE_DELETE_INVALID.throwException(); + } + Namespace namespace = BeanMapperUtil.map(deleteNamespaceRequest, Namespace.class); namespace.setDeleted(CommonConstant.YES); this.namespaceDAO.update(namespace); diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/AppDAO.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/AppDAO.java index 653acd7a..4a59b4a3 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/AppDAO.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/AppDAO.java @@ -19,6 +19,13 @@ public interface AppDAO { */ Long save(App app); + /** + * Delete by id + * + * @param id id + */ + void deleteById(Long id); + /** * Update * @@ -35,6 +42,22 @@ public interface AppDAO { */ App getAppByName(String appName); + /** + * Get by id + * + * @param id id + * @return App + */ + App getById(Long id); + + /** + * Get first by namespace id + * + * @param namespaceId namespaceId + * @return App + */ + App getFirstByNamespaceId(Long namespaceId); + /** * Get by ids. * diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/DelayDAO.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/DelayDAO.java index 1be0f3e6..6a1726eb 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/DelayDAO.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/DelayDAO.java @@ -21,6 +21,13 @@ public interface DelayDAO { */ Long save(Delay delay); + /** + * Delete by id + * + * @param id id + */ + void deleteById(Long id); + /** * Update * @@ -85,4 +92,13 @@ public interface DelayDAO { * @return list */ List findByAppId(Long appId); + + /** + * Get first by namespace id and appid + * + * @param namespaceId namespaceId + * @param appId appId + * @return Delay + */ + Delay getFirstByNamespaceAndAppid(Long namespaceId, Long appId); } diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/DelayInstanceDAO.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/DelayInstanceDAO.java index 22590450..c792838f 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/DelayInstanceDAO.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/DelayInstanceDAO.java @@ -3,6 +3,7 @@ import io.openjob.server.common.dto.PageDTO; import io.openjob.server.repository.dto.DelayInstancePageDTO; import io.openjob.server.repository.dto.DelayInstanceTotalDTO; +import io.openjob.server.repository.entity.Delay; import io.openjob.server.repository.entity.DelayInstance; import java.util.List; @@ -21,16 +22,6 @@ public interface DelayInstanceDAO { */ Integer batchSave(List delayInstanceList); - /** - * List delay instance. - * - * @param slotIds slotIds - * @param time time - * @param size size - * @return List - */ - List listDelayInstance(List slotIds, Integer time, Integer size); - /** * Get by task id. * @@ -88,4 +79,12 @@ public interface DelayInstanceDAO { * @return Integer */ Integer deleteByTaskIds(List taskIds); + + /** + * Get first by delay id + * + * @param delayId delayId + * @return DelayInstance + */ + DelayInstance getFirstByDelayId(Long delayId); } diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/JobDAO.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/JobDAO.java index e9ede9dd..a8fccfa4 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/JobDAO.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/JobDAO.java @@ -56,6 +56,15 @@ public interface JobDAO { */ Job getById(Long id); + /** + * Get first by namespace id and appid + * + * @param namespaceId namespaceId + * @param appId appId + * @return Job + */ + Job getFirstByNamespaceAndAppid(Long namespaceId, Long appId); + /** * List scheduled jobs. * diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/JobInstanceDAO.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/JobInstanceDAO.java index 8cb828ab..d854a96b 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/JobInstanceDAO.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/JobInstanceDAO.java @@ -95,6 +95,14 @@ public interface JobInstanceDAO { */ JobInstance getOneByJobIdAndStatus(Long jobId, Long id, List statusList); + /** + * Get first by job id + * + * @param jobId jobId + * @return JobInstance + */ + JobInstance getFirstByJobId(Long jobId); + /** * Get page list. * diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/AppDAOImpl.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/AppDAOImpl.java index fb13c519..5e6b06a2 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/AppDAOImpl.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/AppDAOImpl.java @@ -43,6 +43,11 @@ public Long save(App app) { return appRepository.save(app).getId(); } + @Override + public void deleteById(Long id) { + this.appRepository.deleteById(id); + } + @Override public Long update(App app) { this.appRepository.findById(app.getId()) @@ -77,6 +82,16 @@ public App getAppByName(String appName) { return appRepository.findAppByName(appName); } + @Override + public App getById(Long id) { + return this.appRepository.findById(id).orElse(null); + } + + @Override + public App getFirstByNamespaceId(Long namespaceId) { + return this.appRepository.findFirstByNamespaceIdAndDeleted(namespaceId, CommonConstant.NO); + } + @Override public List getByIds(List ids) { return this.appRepository.findAllById(ids); diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayDAOImpl.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayDAOImpl.java index 1f744d94..28fd590e 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayDAOImpl.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayDAOImpl.java @@ -43,6 +43,11 @@ public Long save(Delay delay) { return this.delayRepository.save(delay).getId(); } + @Override + public void deleteById(Long id) { + this.delayRepository.deleteById(id); + } + @Override public Long update(Delay delay) { this.delayRepository.findById(delay.getId()) @@ -89,6 +94,38 @@ public void updateStatusOrDeleted(Long id, Integer status, Integer deleted) { }); } + @Override + public Delay findByTopic(String topic) { + return this.delayRepository.findByTopic(topic); + } + + @Override + public Optional findById(Long id) { + return this.delayRepository.findById(id); + } + + @Override + public List findByTopics(List topics) { + return this.delayRepository.findByTopicIn(topics); + } + + @Override + public List findByAppId(Long appId) { + return this.delayRepository.findByAppIdAndDeleted(appId, CommonConstant.NO); + } + + @Override + public Delay getFirstByNamespaceAndAppid(Long namespaceId, Long appId) { + Delay delay = new Delay(); + delay.setNamespaceId(namespaceId); + delay.setDeleted(CommonConstant.NO); + + if (Objects.nonNull(appId)) { + delay.setAppId(appId); + } + return this.delayRepository.findOne(Example.of(delay)).orElse(null); + } + @Override public PageDTO pageList(DelayPageDTO delayPageDTO) { // Matcher @@ -136,24 +173,4 @@ public PageDTO pageList(DelayPageDTO delayPageDTO) { } return pageDTO; } - - @Override - public Delay findByTopic(String topic) { - return this.delayRepository.findByTopic(topic); - } - - @Override - public Optional findById(Long id) { - return this.delayRepository.findById(id); - } - - @Override - public List findByTopics(List topics) { - return this.delayRepository.findByTopicIn(topics); - } - - @Override - public List findByAppId(Long appId) { - return this.delayRepository.findByAppIdAndDeleted(appId, CommonConstant.NO); - } } diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayInstanceDAOImpl.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayInstanceDAOImpl.java index c1162e58..c765243e 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayInstanceDAOImpl.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayInstanceDAOImpl.java @@ -7,7 +7,6 @@ import io.openjob.server.repository.dto.DelayInstancePageDTO; import io.openjob.server.repository.dto.DelayInstanceTotalDTO; import io.openjob.server.repository.entity.DelayInstance; -import io.openjob.server.repository.entity.JobInstance; import io.openjob.server.repository.repository.DelayInstanceRepository; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -104,6 +103,55 @@ public void updateDeleted(String taskid, Integer deleted) { }); } + @Override + public DelayInstance getByTaskId(String taskId) { + return this.delayInstanceRepository.findByTaskId(taskId); + } + + @Override + public List getTopicTotalCount(List topics, List statuses) { + return this.delayInstanceRepository.getDelayTotalCount(topics, statuses, CommonConstant.NO); + } + + @Override + public DelayInstance getFirstByDelayId(Long delayId) { + return this.delayInstanceRepository.findFirstByDelayIdAndDeleted(delayId, CommonConstant.NO); + } + + @Override + public Integer updateStatus(String taskId, Integer status) { + return this.delayInstanceRepository.updateStatusByTaskId(taskId, status); + } + + @Override + public Integer batchUpdateStatus(List updateList) { + // When then sql. + StringBuilder statusWhenThen = new StringBuilder(); + StringBuilder addressWhenThen = new StringBuilder(); + StringBuilder completeWhenThen = new StringBuilder(); + updateList.forEach(d -> { + statusWhenThen.append(String.format(" when '%s' then %d ", d.getTaskId(), d.getStatus())); + addressWhenThen.append(String.format(" when '%s' then '%s' ", d.getTaskId(), d.getWorkerAddress())); + completeWhenThen.append(String.format(" when '%s' then '%s' ", d.getTaskId(), d.getCompleteTime())); + }); + + // Update sql. + String sql = String.format("update `delay_instance` set `worker_address`=(case `task_id` %s ELSE `worker_address` END)," + + "`complete_time`=(case `task_id` %s ELSE `complete_time` END),`update_time`=%d, " + + "`status`=(case `task_id` %s ELSE `status` END) where `status`< (case `task_id` %s ELSE `status` END)", + addressWhenThen, + completeWhenThen, + DateUtil.timestamp(), + statusWhenThen, + statusWhenThen); + return this.jdbcTemplate.update(sql); + } + + @Override + public Integer deleteByTaskIds(List taskIds) { + return this.delayInstanceRepository.batchDeleteByTaskIds(taskIds, CommonConstant.YES, DateUtil.timestamp()); + } + @Override public PageDTO pageList(DelayInstancePageDTO instancePageDTO) { @@ -164,54 +212,4 @@ public PageDTO pageList(DelayInstancePageDTO instancePageDTO) { } return pageDTO; } - - @Override - public List listDelayInstance(List slotIds, Integer time, Integer size) { - DelayInstance delayInstance = new DelayInstance(); - return this.delayInstanceRepository.findAll(Example.of(delayInstance), PageRequest.of(0, size, Sort.by("id"))).toList(); - } - - @Override - public DelayInstance getByTaskId(String taskId) { - return this.delayInstanceRepository.findByTaskId(taskId); - } - - @Override - public List getTopicTotalCount(List topics, List statuses) { - return this.delayInstanceRepository.getDelayTotalCount(topics, statuses, CommonConstant.NO); - } - - @Override - public Integer updateStatus(String taskId, Integer status) { - return this.delayInstanceRepository.updateStatusByTaskId(taskId, status); - } - - @Override - public Integer batchUpdateStatus(List updateList) { - // When then sql. - StringBuilder statusWhenThen = new StringBuilder(); - StringBuilder addressWhenThen = new StringBuilder(); - StringBuilder completeWhenThen = new StringBuilder(); - updateList.forEach(d -> { - statusWhenThen.append(String.format(" when '%s' then %d ", d.getTaskId(), d.getStatus())); - addressWhenThen.append(String.format(" when '%s' then '%s' ", d.getTaskId(), d.getWorkerAddress())); - completeWhenThen.append(String.format(" when '%s' then '%s' ", d.getTaskId(), d.getCompleteTime())); - }); - - // Update sql. - String sql = String.format("update `delay_instance` set `worker_address`=(case `task_id` %s ELSE `worker_address` END)," - + "`complete_time`=(case `task_id` %s ELSE `complete_time` END),`update_time`=%d, " - + "`status`=(case `task_id` %s ELSE `status` END) where `status`< (case `task_id` %s ELSE `status` END)", - addressWhenThen, - completeWhenThen, - DateUtil.timestamp(), - statusWhenThen, - statusWhenThen); - return this.jdbcTemplate.update(sql); - } - - @Override - public Integer deleteByTaskIds(List taskIds) { - return this.delayInstanceRepository.batchDeleteByTaskIds(taskIds, CommonConstant.YES, DateUtil.timestamp()); - } } diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobDAOImpl.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobDAOImpl.java index 143b2301..fd516ae2 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobDAOImpl.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobDAOImpl.java @@ -82,7 +82,7 @@ public Long update(Job job) { j.setExtendParams(job.getExtendParams()); // Update next execute time. - if (Objects.nonNull(job.getNextExecuteTime())){ + if (Objects.nonNull(job.getNextExecuteTime())) { j.setNextExecuteTime(job.getNextExecuteTime()); } @@ -122,6 +122,18 @@ public Job getById(Long id) { return this.jobRepository.findById(id).orElse(null); } + @Override + public Job getFirstByNamespaceAndAppid(Long namespaceId, Long appId) { + Job job = new Job(); + job.setNamespaceId(namespaceId); + job.setDeleted(CommonConstant.NO); + + if (Objects.nonNull(appId)) { + job.setAppId(appId); + } + return this.jobRepository.findOne(Example.of(job)).orElse(null); + } + @Override public List listScheduledJobs(List slotIds, Long time) { List notTypes = Arrays.asList(TimeExpressionTypeEnum.NONE.getType(), TimeExpressionTypeEnum.SECOND_DELAY.getType()); diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobInstanceDAOImpl.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobInstanceDAOImpl.java index 1f7c2ab6..9c8dac5c 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobInstanceDAOImpl.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobInstanceDAOImpl.java @@ -5,12 +5,14 @@ import io.openjob.common.constant.TimeExpressionTypeEnum; import io.openjob.common.util.DateUtil; import io.openjob.server.common.dto.PageDTO; +import io.openjob.server.repository.dao.JobDAO; import io.openjob.server.repository.dao.JobInstanceDAO; import io.openjob.server.repository.dto.JobInstancePageDTO; import io.openjob.server.repository.entity.JobInstance; import io.openjob.server.repository.repository.JobInstanceRepository; import org.apache.commons.lang3.math.NumberUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.Example; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Sort; @@ -65,13 +67,13 @@ public JobInstance getById(Long id) { @Override public List getUnDispatchedList(Set slotsIds, Long executeTime, InstanceStatusEnum status) { - return this.jobInstanceRepository.findByExecuteTimeLessThanAndSlotsIdInAndStatus(executeTime, slotsIds, status.getStatus()); + return this.jobInstanceRepository.findByExecuteTimeLessThanAndSlotsIdInAndStatusAndDeleted(executeTime, slotsIds, status.getStatus(), CommonConstant.NO); } @Override public List getFailoverList(Set slotsIds, Long lastReportTime, InstanceStatusEnum statusEnum) { - return this.jobInstanceRepository.findByLastReportTimeLessThanAndSlotsIdInAndStatusAndTimeExpressionTypeNot( - lastReportTime, slotsIds, statusEnum.getStatus(), TimeExpressionTypeEnum.ONE_TIME.name()); + return this.jobInstanceRepository.findByLastReportTimeLessThanAndSlotsIdInAndStatusAndTimeExpressionTypeNotAndDeleted( + lastReportTime, slotsIds, statusEnum.getStatus(), TimeExpressionTypeEnum.ONE_TIME.name(), CommonConstant.NO); } @Override @@ -81,7 +83,12 @@ public Integer updateByRunning(Long id, String workerAddress, InstanceStatusEnum @Override public JobInstance getOneByJobIdAndStatus(Long jobId, Long id, List statusList) { - return this.jobInstanceRepository.findFirstByJobIdAndIdNotAndStatusIn(jobId, id, statusList); + return this.jobInstanceRepository.findFirstByJobIdAndIdNotAndStatusInAndDeleted(jobId, id, statusList, CommonConstant.NO); + } + + @Override + public JobInstance getFirstByJobId(Long jobId) { + return this.jobInstanceRepository.findFirstByJobIdAndDeleted(jobId, CommonConstant.NO); } @Override diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/AppRepository.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/AppRepository.java index 530e4e06..baa99179 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/AppRepository.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/AppRepository.java @@ -16,4 +16,13 @@ public interface AppRepository extends JpaRepository { * @return App */ App findAppByName(String name); + + /** + * Find first by namespace id + * + * @param namespaceId namespaceId + * @param deleted deleted + * @return App + */ + App findFirstByNamespaceIdAndDeleted(Long namespaceId, Integer deleted); } \ No newline at end of file diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayInstanceRepository.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayInstanceRepository.java index ae810388..44215f46 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayInstanceRepository.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayInstanceRepository.java @@ -57,4 +57,13 @@ public interface DelayInstanceRepository extends JpaRepository, * @param slotsIds slots ids * @param status status * @param type type + * @param deleted deleted * @return List */ - List findByLastReportTimeLessThanAndSlotsIdInAndStatusAndTimeExpressionTypeNot( - Long lastReportTime, Set slotsIds, Integer status, String type); + List findByLastReportTimeLessThanAndSlotsIdInAndStatusAndTimeExpressionTypeNotAndDeleted( + Long lastReportTime, Set slotsIds, Integer status, String type, Integer deleted); /** * Find not dispatch list. @@ -74,9 +75,10 @@ List findByLastReportTimeLessThanAndSlotsIdInAndStatusAndTimeExpres * @param executeTime execute time * @param slotsIds slots ids. * @param status status. + * @param deleted deleted * @return list */ - List findByExecuteTimeLessThanAndSlotsIdInAndStatus(Long executeTime, Set slotsIds, Integer status); + List findByExecuteTimeLessThanAndSlotsIdInAndStatusAndDeleted(Long executeTime, Set slotsIds, Integer status, Integer deleted); /** * Find first by id and status. @@ -84,7 +86,17 @@ List findByLastReportTimeLessThanAndSlotsIdInAndStatusAndTimeExpres * @param jobId jobId * @param id id * @param statusList statusList + * @param deleted deleted * @return JobInstance */ - JobInstance findFirstByJobIdAndIdNotAndStatusIn(Long jobId, Long id, List statusList); + JobInstance findFirstByJobIdAndIdNotAndStatusInAndDeleted(Long jobId, Long id, List statusList, Integer deleted); + + /** + * Find first by job id and deleted + * + * @param jobId jobId + * @param deleted deleted + * @return JobInstance + */ + JobInstance findFirstByJobIdAndDeleted(Long jobId, Integer deleted); } From b50b7ae17e99cd0aa6ac7fe5ca1632ca6e8deeb5 Mon Sep 17 00:00:00 2001 From: stelin <794774870@qq.com> Date: Fri, 19 May 2023 14:03:13 +0800 Subject: [PATCH 08/13] :bug:fixed delete bug --- .../main/java/io/openjob/server/admin/constant/CodeEnum.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/constant/CodeEnum.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/constant/CodeEnum.java index ceeb7e2c..78849ed9 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/constant/CodeEnum.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/constant/CodeEnum.java @@ -21,7 +21,7 @@ public enum CodeEnum implements CodeExceptionAssert { USER_EXIST(100, "User is exist!"), // Namespace - NAMESPACE_DELETE_INVALID(201, "Namespace can not be delete!"), + NAMESPACE_DELETE_INVALID(200, "Namespace can not be delete!"), // Application APP_NAME_EXIST(300, "App name must be globally unique!"), From a681ff6fba453d632fe4a5efe50310cc8dd37c5a Mon Sep 17 00:00:00 2001 From: stelin <794774870@qq.com> Date: Fri, 19 May 2023 14:31:44 +0800 Subject: [PATCH 09/13] :bug:fixed findOne bug --- .../server/repository/dao/impl/DelayDAOImpl.java | 10 ++-------- .../openjob/server/repository/dao/impl/JobDAOImpl.java | 9 +-------- .../server/repository/repository/DelayRepository.java | 10 ++++++++++ .../server/repository/repository/JobRepository.java | 10 ++++++++++ 4 files changed, 23 insertions(+), 16 deletions(-) diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayDAOImpl.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayDAOImpl.java index 28fd590e..e0458e0d 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayDAOImpl.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/DelayDAOImpl.java @@ -16,6 +16,7 @@ import org.springframework.data.domain.Sort; import org.springframework.stereotype.Component; +import javax.print.attribute.standard.MediaSize; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -116,14 +117,7 @@ public List findByAppId(Long appId) { @Override public Delay getFirstByNamespaceAndAppid(Long namespaceId, Long appId) { - Delay delay = new Delay(); - delay.setNamespaceId(namespaceId); - delay.setDeleted(CommonConstant.NO); - - if (Objects.nonNull(appId)) { - delay.setAppId(appId); - } - return this.delayRepository.findOne(Example.of(delay)).orElse(null); + return this.delayRepository.findFirstByNamespaceIdAndAppIdAndDeleted(namespaceId, appId, CommonConstant.NO); } @Override diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobDAOImpl.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobDAOImpl.java index fd516ae2..4e412296 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobDAOImpl.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobDAOImpl.java @@ -124,14 +124,7 @@ public Job getById(Long id) { @Override public Job getFirstByNamespaceAndAppid(Long namespaceId, Long appId) { - Job job = new Job(); - job.setNamespaceId(namespaceId); - job.setDeleted(CommonConstant.NO); - - if (Objects.nonNull(appId)) { - job.setAppId(appId); - } - return this.jobRepository.findOne(Example.of(job)).orElse(null); + return this.jobRepository.findFirstByNamespaceIdAndAppIdAndDeleted(namespaceId, appId, CommonConstant.NO); } @Override diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayRepository.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayRepository.java index aa6de930..34c2ef24 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayRepository.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayRepository.java @@ -35,4 +35,14 @@ public interface DelayRepository extends JpaRepository { * @return List */ List findByAppIdAndDeleted(Long appId, Integer deleted); + + /** + * Find first by namespace and appid + * + * @param namespaceId namespaceId + * @param appId appId + * @param deleted deleted + * @return Delay + */ + Delay findFirstByNamespaceIdAndAppIdAndDeleted(Long namespaceId, Long appId, Integer deleted); } diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/JobRepository.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/JobRepository.java index 4a1608ba..8c298847 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/JobRepository.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/JobRepository.java @@ -21,4 +21,14 @@ public interface JobRepository extends JpaRepository { * @return jobs */ List findBySlotsIdInAndStatusAndTimeExpressionTypeNotInAndNextExecuteTimeLessThanEqualAndDeleted(List slotIds, Integer status, List types, Long time, Integer deleted); + + /** + * Find first by namespace id and app id + * + * @param namespaceId namespaceId + * @param appId appId + * @param deleted deleted + * @return Job + */ + Job findFirstByNamespaceIdAndAppIdAndDeleted(Long namespaceId, Long appId, Integer deleted); } From aa5a9c133760d7fa2826beb9566c7a96ef0d3d27 Mon Sep 17 00:00:00 2001 From: stelin <794774870@qq.com> Date: Fri, 19 May 2023 15:24:06 +0800 Subject: [PATCH 10/13] :bug:fixed onetime --- .../server/repository/dao/impl/JobDAOImpl.java | 2 +- .../scheduler/service/JobSchedulingService.java | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobDAOImpl.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobDAOImpl.java index 4e412296..c0f624ef 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobDAOImpl.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/dao/impl/JobDAOImpl.java @@ -85,7 +85,7 @@ public Long update(Job job) { if (Objects.nonNull(job.getNextExecuteTime())) { j.setNextExecuteTime(job.getNextExecuteTime()); } - + j.setStatus(job.getStatus()); j.setFailRetryInterval(job.getFailRetryInterval()); j.setFailRetryTimes(job.getFailRetryTimes()); j.setConcurrency(job.getConcurrency()); diff --git a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/service/JobSchedulingService.java b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/service/JobSchedulingService.java index 49d1f750..0ca2fc84 100644 --- a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/service/JobSchedulingService.java +++ b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/service/JobSchedulingService.java @@ -187,7 +187,6 @@ private void createJobInstance(List jobs) { jobInstance.setParamsType(j.getParamsType()); jobInstance.setParams(j.getParams()); jobInstance.setSlotsId(j.getSlotsId()); - jobInstance.setExecuteTime(j.getNextExecuteTime()); jobInstance.setDeleteTime(0L); jobInstance.setDeleted(CommonConstant.NO); jobInstance.setCreateTime(now); @@ -209,6 +208,17 @@ private void createJobInstance(List jobs) { jobInstance.setExtendParams(j.getExtendParams()); jobInstance.setWorkflowId(j.getWorkflowId()); + if (TimeExpressionTypeEnum.isCron(j.getTimeExpressionType())) { + // Cron + jobInstance.setExecuteTime(j.getNextExecuteTime()); + } else if (TimeExpressionTypeEnum.isOneTime(j.getTimeExpressionType())) { + // One time + jobInstance.setExecuteTime(Long.valueOf(j.getTimeExpression())); + } else { + // Not supported + return; + } + Long instanceId = jobInstanceDAO.save(jobInstance); jobInstance.setId(instanceId); From ecfe27b9c80bff2e39d37848839ca72bca6c6379 Mon Sep 17 00:00:00 2001 From: stelin <794774870@qq.com> Date: Fri, 19 May 2023 16:07:02 +0800 Subject: [PATCH 11/13] :bug:fixed cluster --- .../server/cluster/manager/JoinManager.java | 3 +++ .../server/cluster/service/WorkerService.java | 4 ++-- .../server/cluster/util/ClusterUtil.java | 4 +++- .../server/cluster/util/ClusterUtilTest.java | 18 ++++++++++++++++++ 4 files changed, 26 insertions(+), 3 deletions(-) diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/JoinManager.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/JoinManager.java index eaf6462b..f8736911 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/JoinManager.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/manager/JoinManager.java @@ -94,6 +94,9 @@ public Boolean doJoin(String hostname, Integer port) { // Refresh current slots. this.refreshManager.refreshCurrentSlots(); + + // Refresh app workers; + this.refreshManager.refreshAppWorkers(); return true; } diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/WorkerService.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/WorkerService.java index 28e7ea63..c73acda3 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/WorkerService.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/WorkerService.java @@ -177,7 +177,7 @@ public void workerCheck() { workerStartRequest.setAppName(w.getAppName()); workerStartRequest.setWorkerKey(w.getWorkerKey()); - log.info("Scheduling worker start begin!"); + log.info("Scheduling worker start begin! address={}", w.getAddress()); this.workerStart(workerStartRequest); } }); @@ -193,7 +193,7 @@ public void workerCheck() { workerStopRequest.setAddress(w.getAddress()); workerStopRequest.setAppName(w.getAppName()); - log.info("Scheduling worker stop begin!"); + log.info("Scheduling worker stop begin! address={}", w.getAddress()); this.workerStop(workerStopRequest); } }); diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/util/ClusterUtil.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/util/ClusterUtil.java index 07e02179..2c596f8f 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/util/ClusterUtil.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/util/ClusterUtil.java @@ -57,6 +57,7 @@ public static void refreshAppWorkers(List workers) { Map> appWorkers = workers.stream() .map(w -> { WorkerDTO workerDTO = new WorkerDTO(); + workerDTO.setNamespaceId(w.getNamespaceId()); workerDTO.setAppId(w.getAppId()); workerDTO.setWorkerKey(w.getWorkerKey()); workerDTO.setAddress(w.getAddress()); @@ -67,12 +68,13 @@ public static void refreshAppWorkers(List workers) { .collect(Collectors.groupingBy(WorkerDTO::getAppId)); ClusterContext.refreshAppWorkers(appWorkers); + log.info("Refresh app workers {}", appWorkers); } /** * Online workers. * - * @param appId appId + * @param appId appId * @return Set */ public static Set getOnlineWorkers(Long appId) { diff --git a/openjob-server/openjob-server-cluster/src/test/java/io/openjob/server/cluster/util/ClusterUtilTest.java b/openjob-server/openjob-server-cluster/src/test/java/io/openjob/server/cluster/util/ClusterUtilTest.java index 9998136a..1b6f0a44 100644 --- a/openjob-server/openjob-server-cluster/src/test/java/io/openjob/server/cluster/util/ClusterUtilTest.java +++ b/openjob-server/openjob-server-cluster/src/test/java/io/openjob/server/cluster/util/ClusterUtilTest.java @@ -14,6 +14,24 @@ * @since 1.0.0 */ public class ClusterUtilTest { + + @Test + public void testGetKnowServersByOnlyOne() { + Map nodesMap = new HashMap<>(16); + Node currentNode = null; + Node node = new Node(); + node.setStatus(1); + node.setServerId(1L); + node.setIp("127.0.0.1"); + node.setAkkaAddress(String.format("127.0.0.1:%d", 1L)); + nodesMap.put(1L, node); + + currentNode = node; + List knowServers = ClusterUtil.getKnowServers(nodesMap, currentNode, 5); + List expect = new ArrayList<>(); + Assertions.assertEquals(knowServers, expect); + } + @Test public void testGetKnowServersByTwo() { Map nodesMap = new HashMap<>(16); From fc8f68d108eb9a1e3e34c3455b1a88c666112e0e Mon Sep 17 00:00:00 2001 From: stelin <794774870@qq.com> Date: Fri, 19 May 2023 16:40:35 +0800 Subject: [PATCH 12/13] :bug:fixed server hostname --- .../java/io/openjob/common/util/IpUtil.java | 12 ++++++++++ .../io/openjob/common/util/IpUtilTest.java | 22 +++++++++++++++++++ .../io/openjob/worker/init/WorkerConfig.java | 8 +++++-- 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/openjob-common/src/main/java/io/openjob/common/util/IpUtil.java b/openjob-common/src/main/java/io/openjob/common/util/IpUtil.java index 7e934d7b..4e96b567 100644 --- a/openjob-common/src/main/java/io/openjob/common/util/IpUtil.java +++ b/openjob-common/src/main/java/io/openjob/common/util/IpUtil.java @@ -5,6 +5,7 @@ import java.net.Inet6Address; import java.net.InetAddress; import java.net.NetworkInterface; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Enumeration; import java.util.Iterator; @@ -90,6 +91,17 @@ public static String getLocalAddress() { } } + /** + * Get ip by host + * + * @param host host + * @return String + * @throws UnknownHostException UnknownHostException + */ + public static String getIpByHost(String host) throws UnknownHostException { + return InetAddress.getByName(host).getHostAddress(); + } + /** * Normalize address. * diff --git a/openjob-common/src/test/java/io/openjob/common/util/IpUtilTest.java b/openjob-common/src/test/java/io/openjob/common/util/IpUtilTest.java index 990fa7f3..51c8e215 100644 --- a/openjob-common/src/test/java/io/openjob/common/util/IpUtilTest.java +++ b/openjob-common/src/test/java/io/openjob/common/util/IpUtilTest.java @@ -1,8 +1,12 @@ package io.openjob.common.util; +import org.checkerframework.checker.units.qual.A; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.net.InetAddress; +import java.net.UnknownHostException; + /** * @author stelin swoft@qq.com * @since 1.0.0 @@ -16,4 +20,22 @@ public void testGetLocalIp() { String formatAddress = IpUtil.getFormatAddress(); Assertions.assertNotNull(formatAddress); } + + @Test + public void testGetIpByHost() throws UnknownHostException { + String ip = IpUtil.getIpByHost("localhost"); + Assertions.assertEquals(ip, "127.0.0.1"); + + String ip2 = IpUtil.getIpByHost("127.0.0.1"); + Assertions.assertEquals(ip2, "127.0.0.1"); + + String ip3 = IpUtil.getIpByHost("github.com"); + Assertions.assertNotNull(ip3); + + String ip4 = IpUtil.getIpByHost("20.205.243.166"); + Assertions.assertEquals(ip4, "20.205.243.166"); + + String ip5 = IpUtil.getIpByHost("172.20.1.166"); + Assertions.assertEquals(ip5, "172.20.1.166"); + } } diff --git a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerConfig.java b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerConfig.java index 1d45f3e1..ecab506b 100644 --- a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerConfig.java +++ b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/init/WorkerConfig.java @@ -5,6 +5,7 @@ import io.openjob.worker.constant.WorkerConstant; import lombok.Getter; +import java.net.UnknownHostException; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; @@ -58,7 +59,7 @@ public class WorkerConfig { /** * Init */ - public void init() { + public void init() throws UnknownHostException { // Already initialized if (this.isInit.get()) { return; @@ -74,7 +75,10 @@ public void init() { workerPort = OpenjobConfig.getInteger(WorkerConstant.WORKER_PORT, WorkerConstant.DEFAULT_WORKER_PORT); workerAddress = String.format("%s:%d", workerHost, workerPort); delayEnable = OpenjobConfig.getBoolean(WorkerConstant.WORKER_DELAY_ENABLE, false); - serverHost = OpenjobConfig.getString(WorkerConstant.SERVER_HOST, IpUtil.getLocalAddress()); + + // Server hostname + String serverHostname = OpenjobConfig.getString(WorkerConstant.SERVER_HOST, IpUtil.getLocalAddress()); + serverHost = IpUtil.getIpByHost(serverHostname); serverPort = OpenjobConfig.getInteger(WorkerConstant.SERVER_PORT, WorkerConstant.DEFAULT_SERVER_PORT); // Initialized From d7742241ee79d619e65b981c577e36d0d8fe22f6 Mon Sep 17 00:00:00 2001 From: stelin <794774870@qq.com> Date: Fri, 19 May 2023 20:44:56 +0800 Subject: [PATCH 13/13] ::lipstick: --- .../repository/repository/DelayInstanceRepository.java | 5 ++++- .../src/main/java/io/openjob/worker/OpenjobWorker.java | 5 +++++ .../src/main/java/io/openjob/worker/init/WorkerShutdown.java | 3 +++ 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayInstanceRepository.java b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayInstanceRepository.java index 44215f46..3f2fdd46 100644 --- a/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayInstanceRepository.java +++ b/openjob-server/openjob-server-repository/src/main/java/io/openjob/server/repository/repository/DelayInstanceRepository.java @@ -37,7 +37,8 @@ public interface DelayInstanceRepository extends JpaRepository getDelayTotalCount(List topics, List statuses, Integer deleted); /** @@ -49,6 +50,8 @@ public interface DelayInstanceRepository extends JpaRepository