Skip to content

Commit

Permalink
fix: Fix the resumeOnRestart function
Browse files Browse the repository at this point in the history
Fix the resumeOnRestart function to handle correctl the recurring jobs + do not run finished non-recurrent jobs on restart + add unit tests
  • Loading branch information
code-xhyun committed Nov 19, 2024
2 parents b04171d + 5a3e1fa commit ec10759
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 5 deletions.
7 changes: 5 additions & 2 deletions src/job/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class Job<T extends JobAttributesData = JobAttributesData> {
attrs: JobAttributes<T>;

constructor(options: Modify<JobAttributes<T>, { _id?: mongodb.ObjectId }>) {
const { pulse, type, nextRunAt, ...args } = options ?? {};
const { pulse, type, nextRunAt, repeatAt, repeatInterval, lastFinishedAt, ...args } = options ?? {};

// Save Pulse instance
this.pulse = pulse;
Expand All @@ -213,7 +213,10 @@ class Job<T extends JobAttributesData = JobAttributesData> {
name: attrs.name || '',
priority: attrs.priority,
type: type || 'once',
nextRunAt: nextRunAt || new Date(),
// if a job that's non-recurring has a lastFinishedAt (finished the job), do not default nextRunAt to now
// only if it will be defaulted either by explicitly setting it or by computing it computeNextRunAt
nextRunAt:
repeatAt || repeatInterval ? nextRunAt || new Date() : !lastFinishedAt ? nextRunAt || new Date() : nextRunAt,
};
}

Expand Down
55 changes: 52 additions & 3 deletions src/pulse/resume-on-restart.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import createDebugger from 'debug';
import { Pulse } from '.';
import { Job } from '../job';

const debug = createDebugger('pulse:resumeOnRestart');

Expand All @@ -18,18 +19,23 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res

if (this._collection && this._resumeOnRestart) {
const now = new Date();

// Non-recurring jobs
this._collection
.updateMany(
{
$or: [
{
lockedAt: { $exists: true },
nextRunAt: { $ne: null },
$or: [{ $expr: { $eq: ['$runCount', '$finishedCount'] } }, { lastFinishedAt: { $exists: false } }],
$or: [
{ $expr: { $eq: ['$runCount', '$finishedCount'] } },
{ $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] },
],
},
{
lockedAt: { $exists: false },
lastFinishedAt: { $exists: false },
$or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }],
nextRunAt: { $lte: now, $ne: null },
},
],
Expand All @@ -41,7 +47,50 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res
)
.then((result) => {
if (result.modifiedCount > 0) {
debug('resuming unfinished %d jobs(%s)', result.modifiedCount, now.toISOString());
debug('Resumed %d unfinished standard jobs (%s)', result.modifiedCount, now.toISOString());
}
});

// Recurring jobs
this._collection
.find({
$and: [
{ $or: [{ repeatInterval: { $exists: true } }, { repeatAt: { $exists: true } }] },
{ $or: [{ nextRunAt: { $lte: now } }, { nextRunAt: { $exists: false } }, { nextRunAt: null }] },
],
})
.toArray()
.then((jobs) => {
const updates = jobs.map((jobData) => {
const job = new Job({
pulse: this,
name: jobData.name || '',
data: jobData.data || {},
type: jobData.type || 'normal',
priority: jobData.priority || 'normal',
shouldSaveResult: jobData.shouldSaveResult || false,
attempts: jobData.attempts || 0,
backoff: jobData.backoff,
...jobData,
});

job.computeNextRunAt();

return this._collection.updateOne(
{ _id: job.attrs._id },
{
$set: { nextRunAt: job.attrs.nextRunAt },
$unset: { lockedAt: undefined, lastModifiedBy: undefined, lastRunAt: undefined },
}
);
});

return Promise.all(updates);
})
.then((results) => {
const modifiedCount = results.filter((res) => res.modifiedCount > 0).length;
if (modifiedCount > 0) {
debug('Resumed %d recurring jobs (%s)', modifiedCount, now.toISOString());
}
});
}
Expand Down
126 changes: 126 additions & 0 deletions test/unit/pulse.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,132 @@ describe('Test Pulse', () => {
test('returns itself', () => {
expect(globalPulseInstance.resumeOnRestart(false)).toEqual(globalPulseInstance);
});

test('should not reschedule successfully finished non-recurring jobs', async () => {
const job = globalPulseInstance.create('sendEmail', { to: '[email protected]' });
job.attrs.lastFinishedAt = new Date();
job.attrs.nextRunAt = null;
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
expect(updatedJob.attrs.nextRunAt).toBeNull();
});

test('should resume non-recurring jobs on restart', async () => {
const job = globalPulseInstance.create('sendEmail', { to: '[email protected]' });
job.attrs.nextRunAt = new Date(Date.now() - 1000);
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(Date.now() - 100);
});

test('should resume recurring jobs on restart - interval', async () => {
const job = globalPulseInstance.create('sendEmail', { to: '[email protected]' });
job.attrs.repeatInterval = '5 minutes';
job.attrs.nextRunAt = null;
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
});

test('should resume recurring jobs on restart - cron', async () => {
const job = globalPulseInstance.create('sendEmail', { to: '[email protected]' });
job.attrs.repeatInterval = '*/5 * * * *';
job.attrs.nextRunAt = null;
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
});

test('should resume recurring jobs on restart - repeatAt', async () => {
const job = globalPulseInstance.create('sendEmail', { to: '[email protected]' });
job.attrs.repeatAt = '1:00 am';
job.attrs.nextRunAt = null;
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
});

test('should not modify jobs with existing nextRunAt', async () => {
const futureDate = new Date(Date.now() + 60 * 60 * 1000);
const job = globalPulseInstance.create('sendEmail', { to: '[email protected]' });
job.attrs.nextRunAt = futureDate;
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
expect(updatedJob.attrs.nextRunAt?.getTime()).toEqual(futureDate.getTime());
});

test('should handle jobs that started but have not finished (non-recurring)', async () => {
const job = globalPulseInstance.create('processData', { data: 'sample' });
job.attrs.nextRunAt = null;
job.attrs.lockedAt = new Date();
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0];

const now = Date.now();
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(now - 100);
});

test('should handle recurring jobs that started but have not finished', async () => {
const job = globalPulseInstance.create('processData', { data: 'sample' });
job.attrs.repeatInterval = '10 minutes';
job.attrs.lockedAt = new Date();
job.attrs.nextRunAt = new Date(Date.now() + 10000);
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0];
expect(updatedJob.attrs.lockedAt).not.toBeNull();
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
});

test('should handle interrupted recurring jobs after server recovery', async () => {
const job = globalPulseInstance.create('processData', { data: 'sample' });
job.attrs.repeatInterval = '5 minutes';
job.attrs.lastModifiedBy = 'server_crash';
job.attrs.nextRunAt = null;
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0];
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
expect(updatedJob.attrs.lastModifiedBy).not.toEqual('server_crash');
});

test('should not modify non-recurring jobs with lastFinishedAt in the past', async () => {
const job = globalPulseInstance.create('sendEmail', { to: '[email protected]' });
job.attrs.lastFinishedAt = new Date(Date.now() - 10000);
job.attrs.nextRunAt = null;
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
expect(updatedJob.attrs.nextRunAt).toBeNull();
});
});
});

Expand Down

0 comments on commit ec10759

Please sign in to comment.