Ghost/ghost/job-manager/lib/job-manager.js
Naz 4b18cbcbdb Added support for worker threads in scheduled jobs
no issue

- When jobs are performing CPU intensive tasks they block main process'
event loop. They also can cause memory leaks or unexpected crashes
effectively crashing the parent proccess. To address these issues jobs need to be performed off of main
process. Worker Threads (https://nodejs.org/dist/latest-v12.x/docs/api/worker_threads.html)
are the best candidate for such work.
- These changes introduce an integration on top of bree
(https://github.com/breejs/bree/) which allows to run recurring
jobs in worker thereads. It falls back to child process execution for
Node v10 running without `--experimental-worker` flag.
- bree was chosen not only because it gives a polyfill for older Node
versions. It has support for some of the future use-cases Ghost is looking to
implement, like scheduled jobs.
- This changeset also includes a complete example of job running on an
interval with a possibility for graceful shutdown
2020-11-19 17:59:36 +13:00

128 lines
3.8 KiB
JavaScript

const path = require('path');
const fastq = require('fastq');
const later = require('@breejs/later');
const Bree = require('bree');
const pWaitFor = require('p-wait-for');
const errors = require('@tryghost/errors');
const isCronExpression = require('./is-cron-expression');
const assembleBreeJob = require('./assemble-bree-job');
const worker = async (task, callback) => {
try {
let result = await task();
callback(null, result);
} catch (error) {
callback(error);
}
};
const handler = (error, result) => {
if (error) {
// TODO: this handler should not be throwing as this blocks the queue
// throw error;
}
// Can potentially standardise the result here
return result;
};
class JobManager {
constructor(logging) {
this.queue = fastq(this, worker, 1);
this.bree = new Bree({
root: false, // set this to `false` to prevent requiring a root directory of jobs
hasSeconds: true, // precission is needed to avoid task ovelaps after immidiate execution
logger: logging
});
this.logging = logging;
}
/**
* Adds job to queue
*
* @param {Function|String} job - function or path to a module defining a job
* @param {Object} [data] - data to be passed into the job
*/
addJob(job, data) {
this.logging.info('Adding one off job to the queue');
this.queue.push(async () => {
try {
if (typeof job === 'function') {
await job(data);
} else {
await require(job)(data);
}
} catch (err) {
// NOTE: each job should be written in a safe way and handle all errors internally
// if the error is caught here jobs implementaton should be changed
this.logging.error(new errors.IgnitionError({
level: 'critical',
errorType: 'UnhandledJobError',
message: 'Processed job threw an unhandled error',
context: (typeof job === 'function') ? 'function' : job,
err
}));
throw err;
}
}, handler);
}
/**
* Schedules recuring job
*
* @param {String} when - cron or human readable schedule format
* @param {Function|String} job - function or path to a module defining a job
* @param {Object} [data] - data to be passed into the job
* @param {String} [name] - job name
*/
scheduleJob(when, job, data, name) {
let schedule;
if (!name) {
if (typeof job === 'string') {
name = path.parse(job).name;
} else {
throw new Error('Name parameter should be present if job is a function');
}
}
schedule = later.parse.text(when);
if (isCronExpression(when)) {
schedule = later.parse.cron(when);
}
if ((schedule.error && schedule.error !== -1) || schedule.schedules.length === 0) {
throw new Error('Invalid schedule format');
}
this.logging.info(`Scheduling job. Next run on: ${later.schedule(schedule).next()}`);
const breeJob = assembleBreeJob(when, job, data, name);
this.bree.add(breeJob);
return this.bree.start(name);
}
/**
* @param {import('p-wait-for').Options} [options]
*/
async shutdown(options) {
await this.bree.stop();
if (this.queue.idle()) {
return;
}
this.logging.warn('Waiting for busy job queue');
await pWaitFor(() => this.queue.idle() === true, options);
this.logging.warn('Job queue finished');
}
}
module.exports = JobManager;