diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/constant/CodeEnum.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/constant/CodeEnum.java index f5d877c2..954625fa 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/constant/CodeEnum.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/constant/CodeEnum.java @@ -46,6 +46,7 @@ public enum CodeEnum implements CodeExceptionAssert { HTTP_PROCESSOR_STRING_V_INVALID(422, "Http string value cat not be empty!"), JOB_CRON_INTERVAL_INVALID(423, "Job cron interval invalid!"), JOB_FIXED_RATE_INTERVAL_INVALID(424, "Job fixed rate interval invalid!"), + JOB_SECOND_DELAY_INTERVAL_INVALID(425, "Job second delay interval invalid!"), // Delay DELAY_TOPIC_EXIST(500, "Topic is exist!"), diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobInstanceTaskServiceImpl.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobInstanceTaskServiceImpl.java index 84a531db..549f2524 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobInstanceTaskServiceImpl.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobInstanceTaskServiceImpl.java @@ -210,13 +210,14 @@ private PageVO getTaskListBySecond(ListTaskRequest request) { // Pull task from worker at first page if (NumberUtils.INTEGER_ZERO.equals(request.getPage() - 1)) { + Long pullCircleId = 0L; JobInstanceTask latestParentTask = this.jobInstanceTaskDAO.getLatestParentTask(request.getJobInstanceId(), TaskConstant.DEFAULT_PARENT_ID); - if (Objects.isNull(latestParentTask)) { - return pageVO; + if (Objects.nonNull(latestParentTask)) { + pullCircleId = latestParentTask.getCircleId(); } // Add pull tasks - List taskList = this.pullTaskListFromWorker(latestParentTask.getCircleId(), jobInstance); + List taskList = this.pullTaskListFromWorker(pullCircleId, jobInstance); pageVO.getList().addAll(0, taskList); } return pageVO; diff --git a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobServiceImpl.java b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobServiceImpl.java index 1a78fd38..b7d7f635 100644 --- a/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobServiceImpl.java +++ b/openjob-server/openjob-server-admin/src/main/java/io/openjob/server/admin/service/impl/JobServiceImpl.java @@ -280,6 +280,7 @@ public PageVO getPageList(ListJobRequest request) { * @param request request */ private void validAndInitJob(AddJobRequest request) { + // Processor type valid if (ProcessorTypeEnum.isShell(request.getProcessorType())) { // Shell if (StringUtils.isBlank(request.getShellProcessorType())) { @@ -325,8 +326,9 @@ private void validAndInitJob(AddJobRequest request) { request.setProcessorInfo(JsonUtil.encode(httpProcessor)); } - // ShardingParams + // Execute type valid if (ExecuteTypeEnum.isSharding(request.getExecuteType())) { + // ShardingParams CodeEnum.SHARDING_PARAMS_INVALID.assertIsFalse(StringUtils.isBlank(request.getShardingParams())); // Format @@ -338,20 +340,25 @@ private void validAndInitJob(AddJobRequest request) { request.setParams(request.getShardingParams()); } - // Cron + // Time expression type valid if (TimeExpressionTypeEnum.isCron(request.getTimeExpressionType())) { + // Cron long one = this.parseTimeExpression(request.getTimeExpression(), null); long two = this.parseTimeExpression(request.getTimeExpression(), new Date(one * 1000L)); if ((two - one) < TimeUnit.MINUTES.toSeconds(1)) { CodeEnum.JOB_CRON_INTERVAL_INVALID.throwException(); } - } - - // Fixed rate - if (TimeExpressionTypeEnum.isFixedRate(request.getTimeExpressionType())) { + } else if (TimeExpressionTypeEnum.isFixedRate(request.getTimeExpressionType())) { + // Fixed rate if (TimeUnit.MINUTES.toSeconds(1) >= request.getTimeExpressionValue()) { CodeEnum.JOB_FIXED_RATE_INTERVAL_INVALID.throwException(); } + } else if (TimeExpressionTypeEnum.isSecondDelay(request.getTimeExpressionType())) { + // Second delay + long delay = Optional.ofNullable(request.getTimeExpressionValue()).orElse(0L); + if (delay <= 0 || delay > 60) { + CodeEnum.JOB_SECOND_DELAY_INTERVAL_INVALID.throwException(); + } } } @@ -392,6 +399,9 @@ private void updateJobBySecond(Job updateJob) { private void createSecondJobInstance(Job job) { Long timestamp = DateUtil.timestamp(); JobInstance jobInstance = BeanMapperUtil.map(job, JobInstance.class); + + // Fixed save to update bug + jobInstance.setId(null); jobInstance.setJobId(job.getId()); jobInstance.setDeleteTime(0L); jobInstance.setDeleted(CommonConstant.NO); diff --git a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/AbstractDelayScheduler.java b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/AbstractDelayScheduler.java index 7b54703e..5896b24c 100644 --- a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/AbstractDelayScheduler.java +++ b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/AbstractDelayScheduler.java @@ -106,5 +106,16 @@ public AbstractRunnable(Long currentSlotId) { public void setFinish(Boolean finish) { this.finish.set(finish); } + + /** + * Fail sleep + */ + protected void failSleep(){ + try { + Thread.sleep(2000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } } } diff --git a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayAddListScheduler.java b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayAddListScheduler.java index 62b00ffd..f057fb2b 100644 --- a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayAddListScheduler.java +++ b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayAddListScheduler.java @@ -112,6 +112,7 @@ public void run() { break; } catch (Throwable ex) { log.error("List delay instance failed!", ex); + this.failSleep(); } } diff --git a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayDeleteListScheduler.java b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayDeleteListScheduler.java index 5a29b122..0b094928 100644 --- a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayDeleteListScheduler.java +++ b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayDeleteListScheduler.java @@ -99,6 +99,7 @@ public void run() { break; } catch (Throwable ex) { log.error("Delete list delay instance failed!", ex); + this.failSleep(); } } diff --git a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayFailZsetScheduler.java b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayFailZsetScheduler.java index 26fe2834..70369ef6 100644 --- a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayFailZsetScheduler.java +++ b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayFailZsetScheduler.java @@ -103,6 +103,7 @@ public void run() { break; } catch (Throwable ex) { log.error("Range delay fail instance failed!", ex); + this.failSleep(); } } diff --git a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayStatusListScheduler.java b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayStatusListScheduler.java index bc683960..7a503e0f 100644 --- a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayStatusListScheduler.java +++ b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayStatusListScheduler.java @@ -102,6 +102,7 @@ public void run() { break; } catch (Throwable ex) { log.error("Status list delay instance failed!", ex); + this.failSleep(); } } diff --git a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayZsetScheduler.java b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayZsetScheduler.java index 4ecc8e66..40df4310 100644 --- a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayZsetScheduler.java +++ b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/scheduler/DelayZsetScheduler.java @@ -117,6 +117,7 @@ public void run() { break; } catch (Throwable ex) { log.error("Range delay instance failed!", ex); + this.failSleep(); } } diff --git a/openjob-server/openjob-server-starter/src/main/resources/banner.txt b/openjob-server/openjob-server-starter/src/main/resources/banner.txt index 0ce543de..c979775c 100644 --- a/openjob-server/openjob-server-starter/src/main/resources/banner.txt +++ b/openjob-server/openjob-server-starter/src/main/resources/banner.txt @@ -5,6 +5,8 @@ ${AnsiColor.GREEN} | |_| | | |_) | | __/ | | | | | | | (_) | | |_) | \___/ | .__/ \___| |_| |_| _/ | \___/ |_.__/ |_| |__/ - :: Version :: Openjob Server(v1.0.6) Spring Boot(v${spring-boot.version}) Akka(v2.6.19) + :: Version :: Openjob Server(v1.0.7) Spring Boot(v${spring-boot.version}) Akka(v2.6.19) :: Website :: https://openjob.io + :: Author :: https://github.com/stelin :: Github :: https://github.com/open-job/openjob + diff --git a/openjob-server/openjob-server-starter/src/main/resources/db/migration/mysql/V5__v1.0.7_release.sql b/openjob-server/openjob-server-starter/src/main/resources/db/migration/mysql/V5__v1.0.7_release.sql index 27f4adaa..83cb8c37 100644 --- a/openjob-server/openjob-server-starter/src/main/resources/db/migration/mysql/V5__v1.0.7_release.sql +++ b/openjob-server/openjob-server-starter/src/main/resources/db/migration/mysql/V5__v1.0.7_release.sql @@ -3,8 +3,7 @@ ALTER TABLE `job_instance` ADD `dispatch_version` bigint(20) NOT NULL DEFAULT '0' COMMENT 'Dispatch version' AFTER `last_report_time`; ALTER TABLE `job_instance` - ADD `execute_once` tinyint(2) NOT NULL DEFAULT '2' COMMENT 'Execute once, 1=yes 2=no' AFTER `execute_once`; - + ADD `execute_once` tinyint(2) NOT NULL DEFAULT '2' COMMENT 'Execute once, 1=yes 2=no' AFTER `execute_type`; #`job_instance_task` # ------------------------------------------------------------ diff --git a/openjob-server/openjob-server-starter/src/main/resources/openjob.conf b/openjob-server/openjob-server-starter/src/main/resources/openjob.conf index fe4c6e98..5cf296ed 100644 --- a/openjob-server/openjob-server-starter/src/main/resources/openjob.conf +++ b/openjob-server/openjob-server-starter/src/main/resources/openjob.conf @@ -1,4 +1,7 @@ akka { + # Dead letters log + log-dead-letters = off + # Coordinated configure coordinated-shutdown { terminate-actor-system = on diff --git a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/master/AbstractTaskMaster.java b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/master/AbstractTaskMaster.java index 63a62d91..7ade11d4 100644 --- a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/master/AbstractTaskMaster.java +++ b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/master/AbstractTaskMaster.java @@ -135,8 +135,9 @@ public void completeTask() throws InterruptedException { // Remove task from manager this.removeTaskFromManager(); - // Not second delay task or execute once - if (!this.isSecondDelay() || CommonConstant.YES.equals(this.jobInstanceDTO.getExecuteOnce())) { + // Stop complete: any task to destroy task container + // Normal complete: not second delay task or execute once to destroy task container + if (this.stopping.get() > 0 || !this.isSecondDelay() || CommonConstant.YES.equals(this.jobInstanceDTO.getExecuteOnce())) { // When task complete reset status. this.running.set(false); diff --git a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/master/StandaloneTaskMaster.java b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/master/StandaloneTaskMaster.java index d007df22..eb443a4b 100644 --- a/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/master/StandaloneTaskMaster.java +++ b/openjob-worker/openjob-worker-core/src/main/java/io/openjob/worker/master/StandaloneTaskMaster.java @@ -61,7 +61,6 @@ public void stop(Integer type) { // Second delay to shut down scheduler if (this.isSecondDelay()) { this.secondDelayService.shutdown(); - return; } super.stop(type);