From bc9e8bbc0de833896d5a127c7de1c4a45d946f0c Mon Sep 17 00:00:00 2001 From: code-xhyun Date: Thu, 15 Aug 2024 03:30:06 +0900 Subject: [PATCH] ix: prevent job execution on server restart when resumeOnRestart is false --- src/job/run.ts | 23 ++++++++++++++++++----- src/pulse/index.ts | 2 ++ src/pulse/resume-on-restart.ts | 9 ++++++++- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/job/run.ts b/src/job/run.ts index 9fa9cb6..2d56d67 100644 --- a/src/job/run.ts +++ b/src/job/run.ts @@ -16,12 +16,14 @@ export const run: RunMethod = async function (this: Job) { return new Promise(async (resolve, reject) => { this.attrs.lastRunAt = new Date(); - this.attrs.runCount = (this.attrs.runCount || 0) + 1; + + const previousRunAt = this.attrs.nextRunAt; debug('[%s:%s] setting lastRunAt to: %s', this.attrs.name, this.attrs._id, this.attrs.lastRunAt.toISOString()); this.computeNextRunAt(); await this.save(); let finished = false; + let resumeOnRestartSkipped = false; const jobCallback = async (error?: Error, result?: unknown) => { // We don't want to complete the job multiple times if (finished) { @@ -33,11 +35,13 @@ export const run: RunMethod = async function (this: Job) { if (error) { this.fail(error); } else { - this.attrs.lastFinishedAt = new Date(); - this.attrs.finishedCount = (this.attrs.finishedCount || 0) + 1; + if (!resumeOnRestartSkipped) { + this.attrs.lastFinishedAt = new Date(); + this.attrs.finishedCount = (this.attrs.finishedCount || 0) + 1; - if (this.attrs.shouldSaveResult && result) { - this.attrs.result = result; + if (this.attrs.shouldSaveResult && result) { + this.attrs.result = result; + } } } @@ -81,6 +85,15 @@ export const run: RunMethod = async function (this: Job) { throw new JobError('Undefined job'); } + if (!this.pulse._resumeOnRestart && previousRunAt && this.pulse._readyAt >= previousRunAt) { + debug('[%s:%s] job resumeOnRestart skipped', this.attrs.name, this.attrs._id); + resumeOnRestartSkipped = true; + await jobCallback(undefined, 'skipped'); + return; + } + + this.attrs.runCount = (this.attrs.runCount || 0) + 1; + if (definition.fn.length === 2) { debug('[%s:%s] process function being called', this.attrs.name, this.attrs._id); await definition.fn(this, jobCallback); diff --git a/src/pulse/index.ts b/src/pulse/index.ts index 39fc07f..b85867c 100644 --- a/src/pulse/index.ts +++ b/src/pulse/index.ts @@ -102,6 +102,7 @@ class Pulse extends EventEmitter { _collection!: Collection; _nextScanAt: any; _processInterval: any; + _readyAt: Date; /** * Constructs a new Pulse object. @@ -143,6 +144,7 @@ class Pulse extends EventEmitter { this._ready = new Promise((resolve) => { this.once('ready', resolve); }); + this._readyAt = new Date(); this.init(config, cb); } diff --git a/src/pulse/resume-on-restart.ts b/src/pulse/resume-on-restart.ts index 00dd860..7b92834 100644 --- a/src/pulse/resume-on-restart.ts +++ b/src/pulse/resume-on-restart.ts @@ -22,7 +22,14 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res .updateMany( { $or: [ - { lockedAt: { $exists: true }, lastFinishedAt: { $exists: false } }, + { + lockedAt: { $exists: true }, + $expr: { $eq: ['$runCount', '$finishedCount'] }, + }, + { + lockedAt: { $exists: true }, + lastFinishedAt: { $exists: false }, + }, { $and: [ { lockedAt: { $exists: false } },