diff --git a/ghost/core/core/server/services/members/service.js b/ghost/core/core/server/services/members/service.js
index b6fd90e792..ff125d207e 100644
--- a/ghost/core/core/server/services/members/service.js
+++ b/ghost/core/core/server/services/members/service.js
@@ -153,7 +153,7 @@ module.exports = {
job: stripeService.migrations.execute.bind(stripeService.migrations)
});
- await jobsService.awaitCompletion(membersMigrationJobName);
+ await jobsService.awaitOneOffCompletion(membersMigrationJobName);
}
}
diff --git a/ghost/core/test/e2e-api/admin/members-exporter.test.js b/ghost/core/test/e2e-api/admin/members-exporter.test.js
index 7902a54aa6..47a8274950 100644
--- a/ghost/core/test/e2e-api/admin/members-exporter.test.js
+++ b/ghost/core/test/e2e-api/admin/members-exporter.test.js
@@ -150,13 +150,13 @@ describe('Members API — exportCSV', function () {
})
});
- const labelsList = labels.map(label => label.get('name')).join(',');
+ const labelsList = labels.map(label => label.get('name')).sort().join(',');
await testOutput(member, (row) => {
basicAsserts(member, row);
should(row.subscribed_to_emails).eql('false');
should(row.complimentary_plan).eql('');
- should(row.labels).eql(labelsList);
+ should(row.labels.split(',').sort().join(',')).eql(labelsList);
should(row.tiers).eql('');
}, [`filter=label:${labels[0].get('slug')}`, 'filter=subscribed:false']);
});
diff --git a/ghost/core/test/integration/services/email-service/batch-sending.test.js b/ghost/core/test/integration/services/email-service/batch-sending.test.js
new file mode 100644
index 0000000000..2a11783613
--- /dev/null
+++ b/ghost/core/test/integration/services/email-service/batch-sending.test.js
@@ -0,0 +1,390 @@
+const {agentProvider, fixtureManager, mockManager} = require('../../../utils/e2e-framework');
+const moment = require('moment');
+const ObjectId = require('bson-objectid').default;
+const models = require('../../../../core/server/models');
+const sinon = require('sinon');
+const assert = require('assert');
+const MailgunClient = require('@tryghost/mailgun-client/lib/mailgun-client');
+const jobManager = require('../../../../core/server/services/jobs/job-service');
+let agent;
+const _ = require('lodash');
+const {MailgunEmailProvider} = require('@tryghost/email-service');
+
+const mobileDocWithPaywall = '{"version":"0.3.1","markups":[],"atoms":[],"cards":[["paywall",{}]],"sections":[[1,"p",[[0,[],0,"Free content"]]],[10,0],[1,"p",[[0,[],0,"Members content"]]]]}';
+
+async function createPublishedPostEmail(settings = {}, email_recipient_filter) {
+ const post = {
+ title: 'A random test post',
+ status: 'draft',
+ feature_image_alt: 'Testing sending',
+ feature_image_caption: 'Testing feature image caption',
+ created_at: moment().subtract(2, 'days').toISOString(),
+ updated_at: moment().subtract(2, 'days').toISOString(),
+ created_by: ObjectId().toHexString(),
+ updated_by: ObjectId().toHexString(),
+ ...settings
+ };
+
+ const res = await agent.post('posts/')
+ .body({posts: [post]})
+ .expectStatus(201);
+
+ const id = res.body.posts[0].id;
+
+ const updatedPost = {
+ status: 'published',
+ updated_at: res.body.posts[0].updated_at
+ };
+
+ const newsletterSlug = fixtureManager.get('newsletters', 0).slug;
+ await agent.put(`posts/${id}/?newsletter=${newsletterSlug}${email_recipient_filter ? `&email_segment=${email_recipient_filter}` : ''}`)
+ .body({posts: [updatedPost]})
+ .expectStatus(200);
+
+ const emailModel = await models.Email.findOne({
+ post_id: id
+ });
+ assert(!!emailModel);
+
+ return emailModel;
+}
+
+async function retryEmail(emailId) {
+ await agent.put(`emails/${emailId}/retry`)
+ .expectStatus(200);
+}
+
+describe('Batch sending tests', function () {
+ let stubbedSend;
+
+ beforeEach(function () {
+ stubbedSend = async function () {
+ return {
+ id: 'stubbed-email-id'
+ };
+ };
+ });
+
+ before(async function () {
+ mockManager.mockSetting('mailgun_api_key', 'test');
+ mockManager.mockSetting('mailgun_domain', 'example.com');
+ mockManager.mockSetting('mailgun_base_url', 'test');
+
+ // We need to stub the Mailgun client before starting Ghost
+ sinon.stub(MailgunClient.prototype, 'getInstance').returns({
+ // @ts-ignore
+ messages: {
+ create: async () => {
+ return await stubbedSend();
+ }
+ }
+ });
+
+ agent = await agentProvider.getAdminAPIAgent();
+ await fixtureManager.init('newsletters', 'members:newsletters');
+ await agent.loginAsOwner();
+ });
+
+ after(function () {
+ mockManager.restore();
+ });
+
+ it('Can send a scheduled post email', async function () {
+ // Prepare a post and email model
+ const completedPromise = jobManager.awaitCompletion('batch-sending-service-job');
+ const emailModel = await createPublishedPostEmail();
+
+ assert.equal(emailModel.get('source_type'), 'mobiledoc');
+ assert(emailModel.get('subject'));
+ assert(emailModel.get('from'));
+
+ // Await sending job
+ await completedPromise;
+
+ await emailModel.refresh();
+ assert.equal(emailModel.get('status'), 'submitted');
+ assert.equal(emailModel.get('email_count'), 4);
+
+ // Did we create batches?
+ const batches = await models.EmailBatch.findAll({filter: `email_id:${emailModel.id}`});
+ assert.equal(batches.models.length, 1);
+
+ // Check all batches are in send state
+ for (const batch of batches.models) {
+ assert.equal(batch.get('provider_id'), 'stubbed-email-id');
+ assert.equal(batch.get('status'), 'submitted');
+ assert.equal(batch.get('member_segment'), null);
+
+ assert.equal(batch.get('error_status_code'), null);
+ assert.equal(batch.get('error_message'), null);
+ assert.equal(batch.get('error_data'), null);
+ }
+
+ // Did we create recipients?
+ const emailRecipients = await models.EmailRecipient.findAll({filter: `email_id:${emailModel.id}`});
+ assert.equal(emailRecipients.models.length, 4);
+
+ for (const recipient of emailRecipients.models) {
+ assert.equal(recipient.get('batch_id'), batches.models[0].id);
+ }
+
+ // Check members are unique
+ const memberIds = emailRecipients.models.map(recipient => recipient.get('member_id'));
+ assert.equal(memberIds.length, _.uniq(memberIds).length);
+ });
+
+ it('Splits recipients in free and paid batch', async function () {
+ // Prepare a post and email model
+ const completedPromise = jobManager.awaitCompletion('batch-sending-service-job');
+ const emailModel = await createPublishedPostEmail({
+ // Requires a paywall
+ mobiledoc: mobileDocWithPaywall,
+ // Required to trigger the paywall
+ visibility: 'paid'
+ });
+
+ assert.equal(emailModel.get('source_type'), 'mobiledoc');
+ assert(emailModel.get('subject'));
+ assert(emailModel.get('from'));
+
+ // Await sending job
+ await completedPromise;
+
+ await emailModel.refresh();
+ assert(emailModel.get('status'), 'submitted');
+ assert.equal(emailModel.get('email_count'), 4);
+
+ // Did we create batches?
+ const batches = await models.EmailBatch.findAll({filter: `email_id:${emailModel.id}`});
+ assert.equal(batches.models.length, 2);
+
+ // Check all batches are in send state
+ const firstBatch = batches.models[0];
+ assert.equal(firstBatch.get('provider_id'), 'stubbed-email-id');
+ assert.equal(firstBatch.get('status'), 'submitted');
+ assert.equal(firstBatch.get('member_segment'), 'status:free');
+ assert.equal(firstBatch.get('error_status_code'), null);
+ assert.equal(firstBatch.get('error_message'), null);
+ assert.equal(firstBatch.get('error_data'), null);
+
+ const secondBatch = batches.models[1];
+ assert.equal(secondBatch.get('provider_id'), 'stubbed-email-id');
+ assert.equal(secondBatch.get('status'), 'submitted');
+ assert.equal(secondBatch.get('member_segment'), 'status:-free');
+ assert.equal(secondBatch.get('error_status_code'), null);
+ assert.equal(secondBatch.get('error_message'), null);
+ assert.equal(secondBatch.get('error_data'), null);
+
+ // Did we create recipients?
+ const emailRecipientsFirstBatch = await models.EmailRecipient.findAll({filter: `email_id:${emailModel.id}+batch_id:${firstBatch.id}`});
+ assert.equal(emailRecipientsFirstBatch.models.length, 2);
+
+ const emailRecipientsSecondBatch = await models.EmailRecipient.findAll({filter: `email_id:${emailModel.id}+batch_id:${secondBatch.id}`});
+ assert.equal(emailRecipientsSecondBatch.models.length, 2);
+
+ // Check members are unique
+ const memberIds = [...emailRecipientsFirstBatch.models, ...emailRecipientsSecondBatch.models].map(recipient => recipient.get('member_id'));
+ assert.equal(memberIds.length, _.uniq(memberIds).length);
+ });
+
+ it('Only sends to members in email recipient filter', async function () {
+ // Prepare a post and email model
+ const completedPromise = jobManager.awaitCompletion('batch-sending-service-job');
+ const emailModel = await createPublishedPostEmail({
+ // Requires a paywall
+ mobiledoc: mobileDocWithPaywall,
+ // Required to trigger the paywall
+ visibility: 'paid'
+ }, 'status:-free');
+
+ assert.equal(emailModel.get('source_type'), 'mobiledoc');
+ assert(emailModel.get('subject'));
+ assert(emailModel.get('from'));
+
+ // Await sending job
+ await completedPromise;
+
+ await emailModel.refresh();
+ assert.equal(emailModel.get('status'), 'submitted');
+ assert.equal(emailModel.get('email_count'), 2);
+
+ // Did we create batches?
+ const batches = await models.EmailBatch.findAll({filter: `email_id:${emailModel.id}`});
+ assert.equal(batches.models.length, 1);
+
+ // Check all batches are in send state
+ const firstBatch = batches.models[0];
+ assert.equal(firstBatch.get('provider_id'), 'stubbed-email-id');
+ assert.equal(firstBatch.get('status'), 'submitted');
+ assert.equal(firstBatch.get('member_segment'), 'status:-free');
+ assert.equal(firstBatch.get('error_status_code'), null);
+ assert.equal(firstBatch.get('error_message'), null);
+ assert.equal(firstBatch.get('error_data'), null);
+
+ // Did we create recipients?
+ const emailRecipients = await models.EmailRecipient.findAll({filter: `email_id:${emailModel.id}`});
+ assert.equal(emailRecipients.models.length, 2);
+
+ // Check members are unique
+ const memberIds = emailRecipients.models.map(recipient => recipient.get('member_id'));
+ assert.equal(_.uniq(memberIds).length, 2);
+ });
+
+ it('Splits up in batches according to email provider batch size', async function () {
+ MailgunEmailProvider.BATCH_SIZE = 1;
+
+ // Prepare a post and email model
+ const completedPromise = jobManager.awaitCompletion('batch-sending-service-job');
+ const emailModel = await createPublishedPostEmail();
+
+ assert.equal(emailModel.get('source_type'), 'mobiledoc');
+ assert(emailModel.get('subject'));
+ assert(emailModel.get('from'));
+
+ // Await sending job
+ await completedPromise;
+
+ await emailModel.refresh();
+ assert.equal(emailModel.get('status'), 'submitted');
+ assert.equal(emailModel.get('email_count'), 4);
+
+ // Did we create batches?
+ const batches = await models.EmailBatch.findAll({filter: `email_id:${emailModel.id}`});
+ assert.equal(batches.models.length, 4);
+
+ const emailRecipients = [];
+
+ // Check all batches are in send state
+ for (const batch of batches.models) {
+ assert.equal(batch.get('provider_id'), 'stubbed-email-id');
+ assert.equal(batch.get('status'), 'submitted');
+ assert.equal(batch.get('member_segment'), null);
+
+ assert.equal(batch.get('error_status_code'), null);
+ assert.equal(batch.get('error_message'), null);
+ assert.equal(batch.get('error_data'), null);
+
+ // Did we create recipients?
+ const batchRecipients = await models.EmailRecipient.findAll({filter: `email_id:${emailModel.id}+batch_id:${batch.id}`});
+ assert.equal(batchRecipients.models.length, 1);
+
+ emailRecipients.push(...batchRecipients.models);
+ }
+
+ // Check members are unique
+ const memberIds = emailRecipients.map(recipient => recipient.get('member_id'));
+ assert.equal(memberIds.length, _.uniq(memberIds).length);
+ });
+
+ it('One failed batch marks the email as failed and allows for a retry', async function () {
+ MailgunEmailProvider.BATCH_SIZE = 1;
+ let counter = 0;
+ stubbedSend = async function () {
+ counter += 1;
+ if (counter === 4) {
+ throw {
+ status: 500,
+ message: 'Internal server error',
+ details: 'Something went wrong'
+ };
+ }
+ return {
+ id: 'stubbed-email-id-' + counter
+ };
+ };
+
+ // Prepare a post and email model
+ let completedPromise = jobManager.awaitCompletion('batch-sending-service-job');
+ const emailModel = await createPublishedPostEmail();
+
+ assert.equal(emailModel.get('source_type'), 'mobiledoc');
+ assert(emailModel.get('subject'));
+ assert(emailModel.get('from'));
+
+ // Await sending job
+ await completedPromise;
+
+ await emailModel.refresh();
+ assert.equal(emailModel.get('status'), 'failed');
+ assert.equal(emailModel.get('email_count'), 4);
+
+ // Did we create batches?
+ let batches = await models.EmailBatch.findAll({filter: `email_id:${emailModel.id}`, order: 'provider_id ASC'});
+ assert.equal(batches.models.length, 4);
+
+ let emailRecipients = [];
+
+ // Check all batches are in send state
+ let count = 0;
+ for (const batch of batches.models) {
+ count += 1;
+
+ if (count === 4) {
+ assert.equal(batch.get('provider_id'), null);
+ assert.equal(batch.get('status'), 'failed');
+ assert.equal(batch.get('error_status_code'), 500);
+ assert.equal(batch.get('error_message'), 'Internal server error:Something went wrong');
+ const errorData = JSON.parse(batch.get('error_data'));
+ assert.equal(errorData.error.status, 500);
+ assert.deepEqual(errorData.messageData.to.length, 1);
+ } else {
+ assert.equal(batch.get('provider_id'), 'stubbed-email-id-' + count);
+ assert.equal(batch.get('status'), 'submitted');
+ assert.equal(batch.get('error_status_code'), null);
+ assert.equal(batch.get('error_message'), null);
+ assert.equal(batch.get('error_data'), null);
+ }
+
+ assert.equal(batch.get('member_segment'), null);
+
+ // Did we create recipients?
+ const batchRecipients = await models.EmailRecipient.findAll({filter: `email_id:${emailModel.id}+batch_id:${batch.id}`});
+ assert.equal(batchRecipients.models.length, 1);
+
+ emailRecipients.push(...batchRecipients.models);
+ }
+
+ // Check members are unique
+ let memberIds = emailRecipients.map(recipient => recipient.get('member_id'));
+ assert.equal(memberIds.length, _.uniq(memberIds).length);
+
+ completedPromise = jobManager.awaitCompletion('batch-sending-service-job');
+ await retryEmail(emailModel.id);
+ await completedPromise;
+
+ await emailModel.refresh();
+ batches = await models.EmailBatch.findAll({filter: `email_id:${emailModel.id}`, order: 'provider_id ASC'});
+ assert.equal(emailModel.get('status'), 'submitted');
+ assert.equal(emailModel.get('email_count'), 4);
+
+ // Did we keep the batches?
+ batches = await models.EmailBatch.findAll({filter: `email_id:${emailModel.id}`, order: 'provider_id ASC'});
+ assert.equal(batches.models.length, 4);
+
+ emailRecipients = [];
+
+ // Check all batches are in send state
+ for (const batch of batches.models) {
+ assert(!!batch.get('provider_id'));
+ assert.equal(batch.get('status'), 'submitted');
+ assert.equal(batch.get('member_segment'), null);
+
+ assert.equal(batch.get('error_status_code'), null);
+ assert.equal(batch.get('error_message'), null);
+ assert.equal(batch.get('error_data'), null);
+
+ // Did we create recipients?
+ const batchRecipients = await models.EmailRecipient.findAll({filter: `email_id:${emailModel.id}+batch_id:${batch.id}`});
+ assert.equal(batchRecipients.models.length, 1);
+
+ emailRecipients.push(...batchRecipients.models);
+ }
+
+ // Check members are unique
+ memberIds = emailRecipients.map(recipient => recipient.get('member_id'));
+ assert.equal(memberIds.length, _.uniq(memberIds).length);
+ });
+
+ // TODO: Link tracking
+ // TODO: Replacement fallbacks
+});
diff --git a/ghost/core/test/utils/e2e-framework-mock-manager.js b/ghost/core/test/utils/e2e-framework-mock-manager.js
index 915a482637..550b96c732 100644
--- a/ghost/core/test/utils/e2e-framework-mock-manager.js
+++ b/ghost/core/test/utils/e2e-framework-mock-manager.js
@@ -15,6 +15,7 @@ let emailCount = 0;
const mailService = require('../../core/server/services/mail/index');
const labs = require('../../core/shared/labs');
const events = require('../../core/server/lib/common/events');
+const settingsCache = require('../../core/shared/settings-cache');
let fakedLabsFlags = {};
const originalLabsIsSet = labs.isSet;
@@ -111,6 +112,29 @@ const emittedEvent = (name) => {
sinon.assert.calledWith(mocks.events, name);
};
+/**
+ * Settings Mocks
+ */
+
+let fakedSettings = {};
+const originalSettingsGetter = settingsCache.get;
+
+const fakeSettingsGetter = (setting) => {
+ if (fakedSettings.hasOwnProperty(setting)) {
+ return fakedSettings[setting];
+ }
+
+ return originalSettingsGetter(setting);
+};
+
+const mockSetting = (key, value) => {
+ if (!mocks.settings) {
+ mocks.settings = sinon.stub(settingsCache, 'get').callsFake(fakeSettingsGetter);
+ }
+
+ fakedSettings[key] = value;
+};
+
/**
* Labs Mocks
*/
@@ -154,6 +178,7 @@ const restore = () => {
sinon.restore();
mocks = {};
fakedLabsFlags = {};
+ fakedSettings = {};
emailCount = 0;
nock.cleanAll();
nock.enableNetConnect();
@@ -171,6 +196,7 @@ module.exports = {
mockLabsEnabled,
mockLabsDisabled,
mockWebhookRequests,
+ mockSetting,
restore,
assert: {
sentEmailCount,
diff --git a/ghost/email-service/lib/batch-sending-service.js b/ghost/email-service/lib/batch-sending-service.js
index 291830d522..f032093a25 100644
--- a/ghost/email-service/lib/batch-sending-service.js
+++ b/ghost/email-service/lib/batch-sending-service.js
@@ -64,6 +64,7 @@ class BatchSendingService {
*/
scheduleEmail(email) {
return this.#jobsService.addJob({
+ name: 'batch-sending-service-job',
job: this.emailJob.bind(this),
data: {emailId: email.id},
offloaded: false
@@ -144,7 +145,7 @@ class BatchSendingService {
const segments = this.#emailRenderer.getSegments(post);
const batches = [];
- const BATCH_SIZE = 500;
+ const BATCH_SIZE = this.#sendingService.getMaximumRecipients();
let totalCount = 0;
for (const segment of segments) {
@@ -160,7 +161,9 @@ class BatchSendingService {
logging.info(`Fetching members batch for email ${email.id} segment ${segment}, lastId: ${lastId}`);
const filter = segmentFilter + (lastId ? `+id:<${lastId}` : '');
- members = await this.#models.Member.getFilteredCollectionQuery({filter, order: 'id DESC'}).select('members.id', 'members.uuid', 'members.email', 'members.name').limit(BATCH_SIZE + 1);
+ members = await this.#models.Member.getFilteredCollectionQuery({filter})
+ .orderByRaw('id DESC')
+ .select('members.id', 'members.uuid', 'members.email', 'members.name').limit(BATCH_SIZE + 1);
if (members.length > BATCH_SIZE) {
lastId = members[members.length - 2].id;
@@ -271,13 +274,13 @@ class BatchSendingService {
* @param {{email: Email, batch: EmailBatch, post: Post, newsletter: Newsletter}} data
* @returns {Promise} True when succeeded, false when failed with an error
*/
- async sendBatch({email, batch, post, newsletter}) {
- logging.info(`Sending batch ${batch.id} for email ${email.id}`);
+ async sendBatch({email, batch: originalBatch, post, newsletter}) {
+ logging.info(`Sending batch ${originalBatch.id} for email ${email.id}`);
// Check the status of the email batch in a 'for update' transaction
- batch = await this.updateStatusLock(this.#models.EmailBatch, batch.id, 'submitting', ['pending', 'failed']);
+ const batch = await this.updateStatusLock(this.#models.EmailBatch, originalBatch.id, 'submitting', ['pending', 'failed']);
if (!batch) {
- logging.error(`Tried sending email batch that is not pending or failed ${batch.id}; status: ${batch.get('status')}`);
+ logging.error(`Tried sending email batch that is not pending or failed ${originalBatch.id}`);
return true;
}
diff --git a/ghost/email-service/lib/mailgun-email-provider.js b/ghost/email-service/lib/mailgun-email-provider.js
index 3109c607a5..7087bfc573 100644
--- a/ghost/email-service/lib/mailgun-email-provider.js
+++ b/ghost/email-service/lib/mailgun-email-provider.js
@@ -144,16 +144,29 @@ class MailgunEmailProvider {
return {
id: response.id.trim().replace(/^<|>$/g, '')
};
- } catch ({error, messageData}) {
- // REF: possible mailgun errors https://documentation.mailgun.com/en/latest/api-intro.html#status-codes
- let ghostError = new errors.EmailError({
- statusCode: error.status,
- message: this.#createMailgunErrorMessage(error),
- errorDetails: JSON.stringify({error, messageData}),
- context: `Mailgun Error ${error.status}: ${error.details}`,
- help: `https://ghost.org/docs/newsletters/#bulk-email-configuration`,
- code: 'BULK_EMAIL_SEND_FAILED'
- });
+ } catch (e) {
+ let ghostError;
+ if (e.error && e.messageData) {
+ const {error, messageData} = e;
+
+ // REF: possible mailgun errors https://documentation.mailgun.com/en/latest/api-intro.html#status-codes
+ ghostError = new errors.EmailError({
+ statusCode: error.status,
+ message: this.#createMailgunErrorMessage(error),
+ errorDetails: JSON.stringify({error, messageData}),
+ context: `Mailgun Error ${error.status}: ${error.details}`,
+ help: `https://ghost.org/docs/newsletters/#bulk-email-configuration`,
+ code: 'BULK_EMAIL_SEND_FAILED'
+ });
+ } else {
+ ghostError = new errors.EmailError({
+ statusCode: undefined,
+ message: e.message,
+ errorDetails: undefined,
+ context: e.context || 'Mailgun Error',
+ code: 'BULK_EMAIL_SEND_FAILED'
+ });
+ }
logging.warn(ghostError);
debug(`failed to send message (${Date.now() - startTime}ms)`);
@@ -163,6 +176,10 @@ class MailgunEmailProvider {
throw ghostError;
}
}
+
+ getMaximumRecipients() {
+ return MailgunEmailProvider.BATCH_SIZE;
+ }
}
module.exports = MailgunEmailProvider;
diff --git a/ghost/email-service/lib/sending-service.js b/ghost/email-service/lib/sending-service.js
index 6a30782982..2a0c10533e 100644
--- a/ghost/email-service/lib/sending-service.js
+++ b/ghost/email-service/lib/sending-service.js
@@ -11,6 +11,7 @@
*
* @typedef {object} IEmailProviderService
* @prop {(emailData: EmailData, options: EmailSendingOptions) => Promise} send
+ * @prop {() => number} getMaximumRecipients
*
* @typedef {object} Post
* @typedef {object} Newsletter
@@ -65,6 +66,10 @@ class SendingService {
this.#emailRenderer = emailRenderer;
}
+ getMaximumRecipients() {
+ return this.#emailProvider.getMaximumRecipients();
+ }
+
/**
* Send a given post, rendered for a given newsletter and segment to the members provided in the list
* @param {object} data
diff --git a/ghost/job-manager/README.md b/ghost/job-manager/README.md
index cc63356223..b9be8ac308 100644
--- a/ghost/job-manager/README.md
+++ b/ghost/job-manager/README.md
@@ -61,7 +61,7 @@ jobsService.addOneOffJob({
// optionally await completion of the one-off job in case
// there are state changes expected to execute the rest of the process
-await jobsService.awaitCompletion('members-migrations');
+await jobsService.awaitOneOffCompletion('members-migrations');
// check if previously registered one-off job has been executed
// successfully - it exists and doesn't have a "failed" state.
diff --git a/ghost/job-manager/lib/job-manager.js b/ghost/job-manager/lib/job-manager.js
index a2878386c8..029f8cabf9 100644
--- a/ghost/job-manager/lib/job-manager.js
+++ b/ghost/job-manager/lib/job-manager.js
@@ -29,6 +29,7 @@ const ALL_STATUSES = {
class JobManager {
#domainEvents;
+ #completionPromises = new Map();
/**
* @param {Object} options
@@ -94,24 +95,37 @@ class JobManager {
}
async _jobMessageHandler({name, message}) {
- if (this._jobsRepository && name) {
+ if (name) {
if (message === ALL_STATUSES.started) {
- const job = await this._jobsRepository.read(name);
+ if (this._jobsRepository) {
+ const job = await this._jobsRepository.read(name);
- if (job) {
- await this._jobsRepository.update(job.id, {
- status: ALL_STATUSES.started,
- started_at: new Date()
- });
+ if (job) {
+ await this._jobsRepository.update(job.id, {
+ status: ALL_STATUSES.started,
+ started_at: new Date()
+ });
+ }
}
} else if (message === 'done') {
- const job = await this._jobsRepository.read(name);
+ if (this._jobsRepository) {
+ const job = await this._jobsRepository.read(name);
- if (job) {
- await this._jobsRepository.update(job.id, {
- status: ALL_STATUSES.finished,
- finished_at: new Date()
- });
+ if (job) {
+ await this._jobsRepository.update(job.id, {
+ status: ALL_STATUSES.finished,
+ finished_at: new Date()
+ });
+ }
+ }
+
+ // Check completion listeners
+ if (this.#completionPromises.has(name)) {
+ for (const listeners of this.#completionPromises.get(name)) {
+ listeners.resolve();
+ }
+ // Clear the listeners
+ this.#completionPromises.delete(name);
}
} else {
if (typeof message === 'object' && this.#domainEvents) {
@@ -134,6 +148,15 @@ class JobManager {
});
}
}
+
+ // Check completion listeners and call them with error
+ if (this.#completionPromises.has(jobMeta.name)) {
+ for (const listeners of this.#completionPromises.get(jobMeta.name)) {
+ listeners.reject(error);
+ }
+ // Clear the listeners
+ this.#completionPromises.delete(jobMeta.name);
+ }
}
/**
@@ -280,12 +303,12 @@ class JobManager {
}
/**
- * Awaits completion of the offloaded job.
+ * Awaits completion of the offloaded one-off job.
* CAUTION: it might take a long time to resolve!
* @param {String} name one-off job name
* @returns resolves with a Job model at current state
*/
- async awaitCompletion(name) {
+ async awaitOneOffCompletion(name) {
const persistedJob = await this._jobsRepository.read({
name
});
@@ -294,12 +317,28 @@ class JobManager {
// NOTE: can implement exponential backoff here if that's ever needed
await setTimeoutPromise(500);
- return this.awaitCompletion(name);
+ return this.awaitOneOffCompletion(name);
}
return persistedJob;
}
+ /***
+ * Create this promise before you add the job you want to listen for. Then await the returned promise.
+ * Resolves if the job has been executed successfully.
+ * Throws an error if the job has failed execution.
+ */
+ async awaitCompletion(name) {
+ const promise = new Promise((resolve, reject) => {
+ this.#completionPromises.set(name, [
+ ...(this.#completionPromises.get(name) ?? []),
+ {resolve, reject}
+ ]);
+ });
+
+ return promise;
+ }
+
/**
* Removes an "offloaded" job from scheduled jobs queue.
* It's NOT yet possible to remove "inline" jobs (will be possible when scheduling is added https://github.com/breejs/bree/issues/68).
diff --git a/ghost/job-manager/test/job-manager.test.js b/ghost/job-manager/test/job-manager.test.js
index 205298cced..c1b954bf65 100644
--- a/ghost/job-manager/test/job-manager.test.js
+++ b/ghost/job-manager/test/job-manager.test.js
@@ -37,7 +37,7 @@ describe('Job Manager', function () {
should.exist(jobManager.addJob);
should.exist(jobManager.hasExecutedSuccessfully);
- should.exist(jobManager.awaitCompletion);
+ should.exist(jobManager.awaitOneOffCompletion);
});
describe('Add a job', function () {
@@ -641,7 +641,7 @@ describe('Job Manager', function () {
});
should.equal(spy.called, false);
- await jobManager.awaitCompletion('solovei');
+ await jobManager.awaitOneOffCompletion('solovei');
should.equal(spy.called, true);
});
});