Added experimental job scheduling feature

no issue

- This method should be used with caution and should serve as a playground for upcoming new feature of scheduled jobs support
This commit is contained in:
Naz 2020-11-05 17:36:29 +13:00
parent 3da365999d
commit 1b1794063f
3 changed files with 54 additions and 0 deletions

View File

@ -1,5 +1,7 @@
const fastq = require('fastq');
const later = require('@breejs/later');
const pWaitFor = require('p-wait-for');
const isCronExpression = require('./is-cron-expression');
const worker = async (task, callback) => {
try {
@ -21,6 +23,7 @@ const handler = (error, result) => {
class JobManager {
constructor(logging) {
this.queue = fastq(this, worker, 1);
this.schedule = [];
this.logging = logging;
}
@ -38,7 +41,44 @@ class JobManager {
}, handler);
}
/**
* Schedules recuring job
*
* @param {Function|String} job - function or path to a file defining a job
* @param {Object} data - data to be passed into the joba
* @param {String} when - cron or human readable schedule format
*/
scheduleJob(job, data, when) {
let schedule;
schedule = later.parse.text(when);
if (isCronExpression(when)) {
schedule = later.parse.cron(when);
}
if ((schedule.error && schedule.error !== -1) || schedule.schedules.length === 0) {
throw new Error('Invalid schedule format');
}
this.logging.info(`Scheduling job. Next run on: ${later.schedule(schedule).next()}`);
const cancelInterval = later.setInterval(() => {
this.logging.info(`Scheduled job added to the queue.`);
this.addJob(job, data);
}, schedule);
this.schedule.push(cancelInterval);
}
/**
* @param {import('p-wait-for').Options} [options]
*/
async shutdown(options) {
this.schedule.forEach((cancelHandle) => {
cancelHandle.clear();
});
if (this.queue.idle()) {
return;
}

View File

@ -24,6 +24,7 @@
"sinon": "9.2.1"
},
"dependencies": {
"@breejs/later": "4.0.2",
"cron-parser": "2.17.0",
"fastq": "1.9.0",
"p-wait-for": "3.1.0"

View File

@ -9,5 +9,18 @@ describe('Job Manager', function () {
const jobManager = new JobManager();
should.exist(jobManager.addJob);
should.exist(jobManager.scheduleJob);
});
describe('Schedule Job', function () {
it ('fails to run for invalid scheduling expression', function () {
const jobManager = new JobManager();
try {
jobManager.scheduleJob(() => {}, {}, 'invalid expression');
} catch (err) {
err.message.should.equal('Invalid schedule format');
}
});
});
});