diff --git a/src/job/index.ts b/src/job/index.ts index 33b4a38..3f768a7 100644 --- a/src/job/index.ts +++ b/src/job/index.ts @@ -186,7 +186,7 @@ class Job { attrs: JobAttributes; constructor(options: Modify, { _id?: mongodb.ObjectId }>) { - const { pulse, type, nextRunAt, ...args } = options ?? {}; + const { pulse, type, nextRunAt, repeatAt, repeatInterval, lastFinishedAt, ...args } = options ?? {}; // Save Pulse instance this.pulse = pulse; @@ -213,7 +213,10 @@ class Job { name: attrs.name || '', priority: attrs.priority, type: type || 'once', - nextRunAt: nextRunAt || new Date(), + // if a job that's non-recurring has a lastFinishedAt (finished the job), do not default nextRunAt to now + // only if it will be defaulted either by explicitly setting it or by computing it computeNextRunAt + nextRunAt: + repeatAt || repeatInterval ? nextRunAt || new Date() : !lastFinishedAt ? nextRunAt || new Date() : nextRunAt, }; } diff --git a/src/pulse/resume-on-restart.ts b/src/pulse/resume-on-restart.ts index 8b77a3b..baf6afb 100644 --- a/src/pulse/resume-on-restart.ts +++ b/src/pulse/resume-on-restart.ts @@ -1,5 +1,6 @@ import createDebugger from 'debug'; import { Pulse } from '.'; +import { Job } from '../job'; const debug = createDebugger('pulse:resumeOnRestart'); @@ -18,6 +19,8 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res if (this._collection && this._resumeOnRestart) { const now = new Date(); + + // Non-recurring jobs this._collection .updateMany( { @@ -25,11 +28,14 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res { lockedAt: { $exists: true }, nextRunAt: { $ne: null }, - $or: [{ $expr: { $eq: ['$runCount', '$finishedCount'] } }, { lastFinishedAt: { $exists: false } }], + $or: [ + { $expr: { $eq: ['$runCount', '$finishedCount'] } }, + { $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] }, + ], }, { lockedAt: { $exists: false }, - lastFinishedAt: { $exists: false }, + $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }], nextRunAt: { $lte: now, $ne: null }, }, ], @@ -41,7 +47,50 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res ) .then((result) => { if (result.modifiedCount > 0) { - debug('resuming unfinished %d jobs(%s)', result.modifiedCount, now.toISOString()); + debug('Resumed %d unfinished standard jobs (%s)', result.modifiedCount, now.toISOString()); + } + }); + + // Recurring jobs + this._collection + .find({ + $and: [ + { $or: [{ repeatInterval: { $exists: true } }, { repeatAt: { $exists: true } }] }, + { $or: [{ nextRunAt: { $lte: now } }, { nextRunAt: { $exists: false } }, { nextRunAt: null }] }, + ], + }) + .toArray() + .then((jobs) => { + const updates = jobs.map((jobData) => { + const job = new Job({ + pulse: this, + name: jobData.name || '', + data: jobData.data || {}, + type: jobData.type || 'normal', + priority: jobData.priority || 'normal', + shouldSaveResult: jobData.shouldSaveResult || false, + attempts: jobData.attempts || 0, + backoff: jobData.backoff, + ...jobData, + }); + + job.computeNextRunAt(); + + return this._collection.updateOne( + { _id: job.attrs._id }, + { + $set: { nextRunAt: job.attrs.nextRunAt }, + $unset: { lockedAt: undefined, lastModifiedBy: undefined, lastRunAt: undefined }, + } + ); + }); + + return Promise.all(updates); + }) + .then((results) => { + const modifiedCount = results.filter((res) => res.modifiedCount > 0).length; + if (modifiedCount > 0) { + debug('Resumed %d recurring jobs (%s)', modifiedCount, now.toISOString()); } }); } diff --git a/test/unit/pulse.spec.ts b/test/unit/pulse.spec.ts index 172b424..bfb566b 100644 --- a/test/unit/pulse.spec.ts +++ b/test/unit/pulse.spec.ts @@ -218,6 +218,132 @@ describe('Test Pulse', () => { test('returns itself', () => { expect(globalPulseInstance.resumeOnRestart(false)).toEqual(globalPulseInstance); }); + + test('should not reschedule successfully finished non-recurring jobs', async () => { + const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); + job.attrs.lastFinishedAt = new Date(); + job.attrs.nextRunAt = null; + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; + expect(updatedJob.attrs.nextRunAt).toBeNull(); + }); + + test('should resume non-recurring jobs on restart', async () => { + const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); + job.attrs.nextRunAt = new Date(Date.now() - 1000); + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; + expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(Date.now() - 100); + }); + + test('should resume recurring jobs on restart - interval', async () => { + const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); + job.attrs.repeatInterval = '5 minutes'; + job.attrs.nextRunAt = null; + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; + expect(updatedJob.attrs.nextRunAt).not.toBeNull(); + }); + + test('should resume recurring jobs on restart - cron', async () => { + const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); + job.attrs.repeatInterval = '*/5 * * * *'; + job.attrs.nextRunAt = null; + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; + expect(updatedJob.attrs.nextRunAt).not.toBeNull(); + }); + + test('should resume recurring jobs on restart - repeatAt', async () => { + const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); + job.attrs.repeatAt = '1:00 am'; + job.attrs.nextRunAt = null; + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; + expect(updatedJob.attrs.nextRunAt).not.toBeNull(); + }); + + test('should not modify jobs with existing nextRunAt', async () => { + const futureDate = new Date(Date.now() + 60 * 60 * 1000); + const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); + job.attrs.nextRunAt = futureDate; + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; + expect(updatedJob.attrs.nextRunAt?.getTime()).toEqual(futureDate.getTime()); + }); + + test('should handle jobs that started but have not finished (non-recurring)', async () => { + const job = globalPulseInstance.create('processData', { data: 'sample' }); + job.attrs.nextRunAt = null; + job.attrs.lockedAt = new Date(); + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0]; + + const now = Date.now(); + expect(updatedJob.attrs.nextRunAt).not.toBeNull(); + expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(now - 100); + }); + + test('should handle recurring jobs that started but have not finished', async () => { + const job = globalPulseInstance.create('processData', { data: 'sample' }); + job.attrs.repeatInterval = '10 minutes'; + job.attrs.lockedAt = new Date(); + job.attrs.nextRunAt = new Date(Date.now() + 10000); + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0]; + expect(updatedJob.attrs.lockedAt).not.toBeNull(); + expect(updatedJob.attrs.nextRunAt).not.toBeNull(); + }); + + test('should handle interrupted recurring jobs after server recovery', async () => { + const job = globalPulseInstance.create('processData', { data: 'sample' }); + job.attrs.repeatInterval = '5 minutes'; + job.attrs.lastModifiedBy = 'server_crash'; + job.attrs.nextRunAt = null; + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0]; + expect(updatedJob.attrs.nextRunAt).not.toBeNull(); + expect(updatedJob.attrs.lastModifiedBy).not.toEqual('server_crash'); + }); + + test('should not modify non-recurring jobs with lastFinishedAt in the past', async () => { + const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); + job.attrs.lastFinishedAt = new Date(Date.now() - 10000); + job.attrs.nextRunAt = null; + await job.save(); + + await globalPulseInstance.resumeOnRestart(); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; + expect(updatedJob.attrs.nextRunAt).toBeNull(); + }); }); });