2020-08-11 23:13:31 +03:00
|
|
|
const fastq = require('fastq');
|
2020-11-05 07:36:29 +03:00
|
|
|
const later = require('@breejs/later');
|
2020-08-11 23:13:31 +03:00
|
|
|
const pWaitFor = require('p-wait-for');
|
2020-11-05 07:36:29 +03:00
|
|
|
const isCronExpression = require('./is-cron-expression');
|
2020-08-11 23:13:31 +03:00
|
|
|
|
|
|
|
const worker = async (task, callback) => {
|
|
|
|
try {
|
|
|
|
let result = await task();
|
|
|
|
callback(null, result);
|
|
|
|
} catch (error) {
|
|
|
|
callback(error);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
const handler = (error, result) => {
|
|
|
|
if (error) {
|
|
|
|
throw error;
|
|
|
|
}
|
|
|
|
// Can potentially standardise the result here
|
|
|
|
return result;
|
|
|
|
};
|
|
|
|
|
|
|
|
class JobManager {
|
|
|
|
constructor(logging) {
|
|
|
|
this.queue = fastq(this, worker, 1);
|
2020-11-05 07:36:29 +03:00
|
|
|
this.schedule = [];
|
2020-08-11 23:13:31 +03:00
|
|
|
this.logging = logging;
|
|
|
|
}
|
|
|
|
|
2020-11-05 06:41:16 +03:00
|
|
|
/**
|
|
|
|
* Adds job to queue
|
|
|
|
*
|
2020-11-05 07:36:51 +03:00
|
|
|
* @param {Function} job - function to be executed in the queue
|
|
|
|
* @param {Object} [data] - data to be passed into the job
|
2020-11-05 06:41:16 +03:00
|
|
|
*/
|
2020-08-11 23:13:31 +03:00
|
|
|
addJob(job, data) {
|
2020-11-05 06:42:23 +03:00
|
|
|
this.logging.info('Adding one off job to the queue');
|
|
|
|
|
2020-08-11 23:13:31 +03:00
|
|
|
this.queue.push(async () => {
|
|
|
|
await job(data);
|
|
|
|
}, handler);
|
|
|
|
}
|
|
|
|
|
2020-11-05 07:36:29 +03:00
|
|
|
/**
|
|
|
|
* Schedules recuring job
|
|
|
|
*
|
|
|
|
* @param {Function|String} job - function or path to a file defining a job
|
|
|
|
* @param {Object} data - data to be passed into the joba
|
|
|
|
* @param {String} when - cron or human readable schedule format
|
|
|
|
*/
|
|
|
|
scheduleJob(job, data, when) {
|
|
|
|
let schedule;
|
|
|
|
|
|
|
|
schedule = later.parse.text(when);
|
|
|
|
|
|
|
|
if (isCronExpression(when)) {
|
|
|
|
schedule = later.parse.cron(when);
|
|
|
|
}
|
|
|
|
|
|
|
|
if ((schedule.error && schedule.error !== -1) || schedule.schedules.length === 0) {
|
|
|
|
throw new Error('Invalid schedule format');
|
|
|
|
}
|
|
|
|
|
|
|
|
this.logging.info(`Scheduling job. Next run on: ${later.schedule(schedule).next()}`);
|
|
|
|
|
|
|
|
const cancelInterval = later.setInterval(() => {
|
|
|
|
this.logging.info(`Scheduled job added to the queue.`);
|
|
|
|
this.addJob(job, data);
|
|
|
|
}, schedule);
|
|
|
|
|
|
|
|
this.schedule.push(cancelInterval);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @param {import('p-wait-for').Options} [options]
|
|
|
|
*/
|
2020-08-11 23:13:31 +03:00
|
|
|
async shutdown(options) {
|
2020-11-05 07:36:29 +03:00
|
|
|
this.schedule.forEach((cancelHandle) => {
|
|
|
|
cancelHandle.clear();
|
|
|
|
});
|
|
|
|
|
2020-08-11 23:13:31 +03:00
|
|
|
if (this.queue.idle()) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
this.logging.warn('Waiting for busy job queue');
|
|
|
|
|
|
|
|
await pWaitFor(() => this.queue.idle() === true, options);
|
|
|
|
|
|
|
|
this.logging.warn('Job queue finished');
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = JobManager;
|