Added E2E tests for batch sending (#15910)

refs https://github.com/TryGhost/Team/issues/2339

- Includes a new pattern in the job manager that allows us to properly
await jobs.
- Added new convenience mocking methods to stub settings
- Tests the main flows for bulk sending:
    - Sending in multiple batches
    - Sending to multiple segments
    - Handling a failed batch and retrying that batch
- Fixes bug in batch generation (ordering not working)

In a different PR I'll add more detailed tests.
This commit is contained in:
Simon Backx 2022-12-01 13:43:49 +01:00 committed by GitHub
parent a5ca64f693
commit 4c166e11df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 518 additions and 38 deletions

View File

@ -153,7 +153,7 @@ module.exports = {
job: stripeService.migrations.execute.bind(stripeService.migrations) job: stripeService.migrations.execute.bind(stripeService.migrations)
}); });
await jobsService.awaitCompletion(membersMigrationJobName); await jobsService.awaitOneOffCompletion(membersMigrationJobName);
} }
} }

View File

@ -150,13 +150,13 @@ describe('Members API — exportCSV', function () {
}) })
}); });
const labelsList = labels.map(label => label.get('name')).join(','); const labelsList = labels.map(label => label.get('name')).sort().join(',');
await testOutput(member, (row) => { await testOutput(member, (row) => {
basicAsserts(member, row); basicAsserts(member, row);
should(row.subscribed_to_emails).eql('false'); should(row.subscribed_to_emails).eql('false');
should(row.complimentary_plan).eql(''); should(row.complimentary_plan).eql('');
should(row.labels).eql(labelsList); should(row.labels.split(',').sort().join(',')).eql(labelsList);
should(row.tiers).eql(''); should(row.tiers).eql('');
}, [`filter=label:${labels[0].get('slug')}`, 'filter=subscribed:false']); }, [`filter=label:${labels[0].get('slug')}`, 'filter=subscribed:false']);
}); });

View File

@ -0,0 +1,390 @@
const {agentProvider, fixtureManager, mockManager} = require('../../../utils/e2e-framework');
const moment = require('moment');
const ObjectId = require('bson-objectid').default;
const models = require('../../../../core/server/models');
const sinon = require('sinon');
const assert = require('assert');
const MailgunClient = require('@tryghost/mailgun-client/lib/mailgun-client');
const jobManager = require('../../../../core/server/services/jobs/job-service');
let agent;
const _ = require('lodash');
const {MailgunEmailProvider} = require('@tryghost/email-service');
const mobileDocWithPaywall = '{"version":"0.3.1","markups":[],"atoms":[],"cards":[["paywall",{}]],"sections":[[1,"p",[[0,[],0,"Free content"]]],[10,0],[1,"p",[[0,[],0,"Members content"]]]]}';
async function createPublishedPostEmail(settings = {}, email_recipient_filter) {
const post = {
title: 'A random test post',
status: 'draft',
feature_image_alt: 'Testing sending',
feature_image_caption: 'Testing <b>feature image caption</b>',
created_at: moment().subtract(2, 'days').toISOString(),
updated_at: moment().subtract(2, 'days').toISOString(),
created_by: ObjectId().toHexString(),
updated_by: ObjectId().toHexString(),
...settings
};
const res = await agent.post('posts/')
.body({posts: [post]})
.expectStatus(201);
const id = res.body.posts[0].id;
const updatedPost = {
status: 'published',
updated_at: res.body.posts[0].updated_at
};
const newsletterSlug = fixtureManager.get('newsletters', 0).slug;
await agent.put(`posts/${id}/?newsletter=${newsletterSlug}${email_recipient_filter ? `&email_segment=${email_recipient_filter}` : ''}`)
.body({posts: [updatedPost]})
.expectStatus(200);
const emailModel = await models.Email.findOne({
post_id: id
});
assert(!!emailModel);
return emailModel;
}
async function retryEmail(emailId) {
await agent.put(`emails/${emailId}/retry`)
.expectStatus(200);
}
describe('Batch sending tests', function () {
let stubbedSend;
beforeEach(function () {
stubbedSend = async function () {
return {
id: 'stubbed-email-id'
};
};
});
before(async function () {
mockManager.mockSetting('mailgun_api_key', 'test');
mockManager.mockSetting('mailgun_domain', 'example.com');
mockManager.mockSetting('mailgun_base_url', 'test');
// We need to stub the Mailgun client before starting Ghost
sinon.stub(MailgunClient.prototype, 'getInstance').returns({
// @ts-ignore
messages: {
create: async () => {
return await stubbedSend();
}
}
});
agent = await agentProvider.getAdminAPIAgent();
await fixtureManager.init('newsletters', 'members:newsletters');
await agent.loginAsOwner();
});
after(function () {
mockManager.restore();
});
it('Can send a scheduled post email', async function () {
// Prepare a post and email model
const completedPromise = jobManager.awaitCompletion('batch-sending-service-job');
const emailModel = await createPublishedPostEmail();
assert.equal(emailModel.get('source_type'), 'mobiledoc');
assert(emailModel.get('subject'));
assert(emailModel.get('from'));
// Await sending job
await completedPromise;
await emailModel.refresh();
assert.equal(emailModel.get('status'), 'submitted');
assert.equal(emailModel.get('email_count'), 4);
// Did we create batches?
const batches = await models.EmailBatch.findAll({filter: `email_id:${emailModel.id}`});
assert.equal(batches.models.length, 1);
// Check all batches are in send state
for (const batch of batches.models) {
assert.equal(batch.get('provider_id'), 'stubbed-email-id');
assert.equal(batch.get('status'), 'submitted');
assert.equal(batch.get('member_segment'), null);
assert.equal(batch.get('error_status_code'), null);
assert.equal(batch.get('error_message'), null);
assert.equal(batch.get('error_data'), null);
}
// Did we create recipients?
const emailRecipients = await models.EmailRecipient.findAll({filter: `email_id:${emailModel.id}`});
assert.equal(emailRecipients.models.length, 4);
for (const recipient of emailRecipients.models) {
assert.equal(recipient.get('batch_id'), batches.models[0].id);
}
// Check members are unique
const memberIds = emailRecipients.models.map(recipient => recipient.get('member_id'));
assert.equal(memberIds.length, _.uniq(memberIds).length);
});
it('Splits recipients in free and paid batch', async function () {
// Prepare a post and email model
const completedPromise = jobManager.awaitCompletion('batch-sending-service-job');
const emailModel = await createPublishedPostEmail({
// Requires a paywall
mobiledoc: mobileDocWithPaywall,
// Required to trigger the paywall
visibility: 'paid'
});
assert.equal(emailModel.get('source_type'), 'mobiledoc');
assert(emailModel.get('subject'));
assert(emailModel.get('from'));
// Await sending job
await completedPromise;
await emailModel.refresh();
assert(emailModel.get('status'), 'submitted');
assert.equal(emailModel.get('email_count'), 4);
// Did we create batches?
const batches = await models.EmailBatch.findAll({filter: `email_id:${emailModel.id}`});
assert.equal(batches.models.length, 2);
// Check all batches are in send state
const firstBatch = batches.models[0];
assert.equal(firstBatch.get('provider_id'), 'stubbed-email-id');
assert.equal(firstBatch.get('status'), 'submitted');
assert.equal(firstBatch.get('member_segment'), 'status:free');
assert.equal(firstBatch.get('error_status_code'), null);
assert.equal(firstBatch.get('error_message'), null);
assert.equal(firstBatch.get('error_data'), null);
const secondBatch = batches.models[1];
assert.equal(secondBatch.get('provider_id'), 'stubbed-email-id');
assert.equal(secondBatch.get('status'), 'submitted');
assert.equal(secondBatch.get('member_segment'), 'status:-free');
assert.equal(secondBatch.get('error_status_code'), null);
assert.equal(secondBatch.get('error_message'), null);
assert.equal(secondBatch.get('error_data'), null);
// Did we create recipients?
const emailRecipientsFirstBatch = await models.EmailRecipient.findAll({filter: `email_id:${emailModel.id}+batch_id:${firstBatch.id}`});
assert.equal(emailRecipientsFirstBatch.models.length, 2);
const emailRecipientsSecondBatch = await models.EmailRecipient.findAll({filter: `email_id:${emailModel.id}+batch_id:${secondBatch.id}`});
assert.equal(emailRecipientsSecondBatch.models.length, 2);
// Check members are unique
const memberIds = [...emailRecipientsFirstBatch.models, ...emailRecipientsSecondBatch.models].map(recipient => recipient.get('member_id'));
assert.equal(memberIds.length, _.uniq(memberIds).length);
});
it('Only sends to members in email recipient filter', async function () {
// Prepare a post and email model
const completedPromise = jobManager.awaitCompletion('batch-sending-service-job');
const emailModel = await createPublishedPostEmail({
// Requires a paywall
mobiledoc: mobileDocWithPaywall,
// Required to trigger the paywall
visibility: 'paid'
}, 'status:-free');
assert.equal(emailModel.get('source_type'), 'mobiledoc');
assert(emailModel.get('subject'));
assert(emailModel.get('from'));
// Await sending job
await completedPromise;
await emailModel.refresh();
assert.equal(emailModel.get('status'), 'submitted');
assert.equal(emailModel.get('email_count'), 2);
// Did we create batches?
const batches = await models.EmailBatch.findAll({filter: `email_id:${emailModel.id}`});
assert.equal(batches.models.length, 1);
// Check all batches are in send state
const firstBatch = batches.models[0];
assert.equal(firstBatch.get('provider_id'), 'stubbed-email-id');
assert.equal(firstBatch.get('status'), 'submitted');
assert.equal(firstBatch.get('member_segment'), 'status:-free');
assert.equal(firstBatch.get('error_status_code'), null);
assert.equal(firstBatch.get('error_message'), null);
assert.equal(firstBatch.get('error_data'), null);
// Did we create recipients?
const emailRecipients = await models.EmailRecipient.findAll({filter: `email_id:${emailModel.id}`});
assert.equal(emailRecipients.models.length, 2);
// Check members are unique
const memberIds = emailRecipients.models.map(recipient => recipient.get('member_id'));
assert.equal(_.uniq(memberIds).length, 2);
});
it('Splits up in batches according to email provider batch size', async function () {
MailgunEmailProvider.BATCH_SIZE = 1;
// Prepare a post and email model
const completedPromise = jobManager.awaitCompletion('batch-sending-service-job');
const emailModel = await createPublishedPostEmail();
assert.equal(emailModel.get('source_type'), 'mobiledoc');
assert(emailModel.get('subject'));
assert(emailModel.get('from'));
// Await sending job
await completedPromise;
await emailModel.refresh();
assert.equal(emailModel.get('status'), 'submitted');
assert.equal(emailModel.get('email_count'), 4);
// Did we create batches?
const batches = await models.EmailBatch.findAll({filter: `email_id:${emailModel.id}`});
assert.equal(batches.models.length, 4);
const emailRecipients = [];
// Check all batches are in send state
for (const batch of batches.models) {
assert.equal(batch.get('provider_id'), 'stubbed-email-id');
assert.equal(batch.get('status'), 'submitted');
assert.equal(batch.get('member_segment'), null);
assert.equal(batch.get('error_status_code'), null);
assert.equal(batch.get('error_message'), null);
assert.equal(batch.get('error_data'), null);
// Did we create recipients?
const batchRecipients = await models.EmailRecipient.findAll({filter: `email_id:${emailModel.id}+batch_id:${batch.id}`});
assert.equal(batchRecipients.models.length, 1);
emailRecipients.push(...batchRecipients.models);
}
// Check members are unique
const memberIds = emailRecipients.map(recipient => recipient.get('member_id'));
assert.equal(memberIds.length, _.uniq(memberIds).length);
});
it('One failed batch marks the email as failed and allows for a retry', async function () {
MailgunEmailProvider.BATCH_SIZE = 1;
let counter = 0;
stubbedSend = async function () {
counter += 1;
if (counter === 4) {
throw {
status: 500,
message: 'Internal server error',
details: 'Something went wrong'
};
}
return {
id: 'stubbed-email-id-' + counter
};
};
// Prepare a post and email model
let completedPromise = jobManager.awaitCompletion('batch-sending-service-job');
const emailModel = await createPublishedPostEmail();
assert.equal(emailModel.get('source_type'), 'mobiledoc');
assert(emailModel.get('subject'));
assert(emailModel.get('from'));
// Await sending job
await completedPromise;
await emailModel.refresh();
assert.equal(emailModel.get('status'), 'failed');
assert.equal(emailModel.get('email_count'), 4);
// Did we create batches?
let batches = await models.EmailBatch.findAll({filter: `email_id:${emailModel.id}`, order: 'provider_id ASC'});
assert.equal(batches.models.length, 4);
let emailRecipients = [];
// Check all batches are in send state
let count = 0;
for (const batch of batches.models) {
count += 1;
if (count === 4) {
assert.equal(batch.get('provider_id'), null);
assert.equal(batch.get('status'), 'failed');
assert.equal(batch.get('error_status_code'), 500);
assert.equal(batch.get('error_message'), 'Internal server error:Something went wrong');
const errorData = JSON.parse(batch.get('error_data'));
assert.equal(errorData.error.status, 500);
assert.deepEqual(errorData.messageData.to.length, 1);
} else {
assert.equal(batch.get('provider_id'), 'stubbed-email-id-' + count);
assert.equal(batch.get('status'), 'submitted');
assert.equal(batch.get('error_status_code'), null);
assert.equal(batch.get('error_message'), null);
assert.equal(batch.get('error_data'), null);
}
assert.equal(batch.get('member_segment'), null);
// Did we create recipients?
const batchRecipients = await models.EmailRecipient.findAll({filter: `email_id:${emailModel.id}+batch_id:${batch.id}`});
assert.equal(batchRecipients.models.length, 1);
emailRecipients.push(...batchRecipients.models);
}
// Check members are unique
let memberIds = emailRecipients.map(recipient => recipient.get('member_id'));
assert.equal(memberIds.length, _.uniq(memberIds).length);
completedPromise = jobManager.awaitCompletion('batch-sending-service-job');
await retryEmail(emailModel.id);
await completedPromise;
await emailModel.refresh();
batches = await models.EmailBatch.findAll({filter: `email_id:${emailModel.id}`, order: 'provider_id ASC'});
assert.equal(emailModel.get('status'), 'submitted');
assert.equal(emailModel.get('email_count'), 4);
// Did we keep the batches?
batches = await models.EmailBatch.findAll({filter: `email_id:${emailModel.id}`, order: 'provider_id ASC'});
assert.equal(batches.models.length, 4);
emailRecipients = [];
// Check all batches are in send state
for (const batch of batches.models) {
assert(!!batch.get('provider_id'));
assert.equal(batch.get('status'), 'submitted');
assert.equal(batch.get('member_segment'), null);
assert.equal(batch.get('error_status_code'), null);
assert.equal(batch.get('error_message'), null);
assert.equal(batch.get('error_data'), null);
// Did we create recipients?
const batchRecipients = await models.EmailRecipient.findAll({filter: `email_id:${emailModel.id}+batch_id:${batch.id}`});
assert.equal(batchRecipients.models.length, 1);
emailRecipients.push(...batchRecipients.models);
}
// Check members are unique
memberIds = emailRecipients.map(recipient => recipient.get('member_id'));
assert.equal(memberIds.length, _.uniq(memberIds).length);
});
// TODO: Link tracking
// TODO: Replacement fallbacks
});

View File

@ -15,6 +15,7 @@ let emailCount = 0;
const mailService = require('../../core/server/services/mail/index'); const mailService = require('../../core/server/services/mail/index');
const labs = require('../../core/shared/labs'); const labs = require('../../core/shared/labs');
const events = require('../../core/server/lib/common/events'); const events = require('../../core/server/lib/common/events');
const settingsCache = require('../../core/shared/settings-cache');
let fakedLabsFlags = {}; let fakedLabsFlags = {};
const originalLabsIsSet = labs.isSet; const originalLabsIsSet = labs.isSet;
@ -111,6 +112,29 @@ const emittedEvent = (name) => {
sinon.assert.calledWith(mocks.events, name); sinon.assert.calledWith(mocks.events, name);
}; };
/**
* Settings Mocks
*/
let fakedSettings = {};
const originalSettingsGetter = settingsCache.get;
const fakeSettingsGetter = (setting) => {
if (fakedSettings.hasOwnProperty(setting)) {
return fakedSettings[setting];
}
return originalSettingsGetter(setting);
};
const mockSetting = (key, value) => {
if (!mocks.settings) {
mocks.settings = sinon.stub(settingsCache, 'get').callsFake(fakeSettingsGetter);
}
fakedSettings[key] = value;
};
/** /**
* Labs Mocks * Labs Mocks
*/ */
@ -154,6 +178,7 @@ const restore = () => {
sinon.restore(); sinon.restore();
mocks = {}; mocks = {};
fakedLabsFlags = {}; fakedLabsFlags = {};
fakedSettings = {};
emailCount = 0; emailCount = 0;
nock.cleanAll(); nock.cleanAll();
nock.enableNetConnect(); nock.enableNetConnect();
@ -171,6 +196,7 @@ module.exports = {
mockLabsEnabled, mockLabsEnabled,
mockLabsDisabled, mockLabsDisabled,
mockWebhookRequests, mockWebhookRequests,
mockSetting,
restore, restore,
assert: { assert: {
sentEmailCount, sentEmailCount,

View File

@ -64,6 +64,7 @@ class BatchSendingService {
*/ */
scheduleEmail(email) { scheduleEmail(email) {
return this.#jobsService.addJob({ return this.#jobsService.addJob({
name: 'batch-sending-service-job',
job: this.emailJob.bind(this), job: this.emailJob.bind(this),
data: {emailId: email.id}, data: {emailId: email.id},
offloaded: false offloaded: false
@ -144,7 +145,7 @@ class BatchSendingService {
const segments = this.#emailRenderer.getSegments(post); const segments = this.#emailRenderer.getSegments(post);
const batches = []; const batches = [];
const BATCH_SIZE = 500; const BATCH_SIZE = this.#sendingService.getMaximumRecipients();
let totalCount = 0; let totalCount = 0;
for (const segment of segments) { for (const segment of segments) {
@ -160,7 +161,9 @@ class BatchSendingService {
logging.info(`Fetching members batch for email ${email.id} segment ${segment}, lastId: ${lastId}`); logging.info(`Fetching members batch for email ${email.id} segment ${segment}, lastId: ${lastId}`);
const filter = segmentFilter + (lastId ? `+id:<${lastId}` : ''); const filter = segmentFilter + (lastId ? `+id:<${lastId}` : '');
members = await this.#models.Member.getFilteredCollectionQuery({filter, order: 'id DESC'}).select('members.id', 'members.uuid', 'members.email', 'members.name').limit(BATCH_SIZE + 1); members = await this.#models.Member.getFilteredCollectionQuery({filter})
.orderByRaw('id DESC')
.select('members.id', 'members.uuid', 'members.email', 'members.name').limit(BATCH_SIZE + 1);
if (members.length > BATCH_SIZE) { if (members.length > BATCH_SIZE) {
lastId = members[members.length - 2].id; lastId = members[members.length - 2].id;
@ -271,13 +274,13 @@ class BatchSendingService {
* @param {{email: Email, batch: EmailBatch, post: Post, newsletter: Newsletter}} data * @param {{email: Email, batch: EmailBatch, post: Post, newsletter: Newsletter}} data
* @returns {Promise<boolean>} True when succeeded, false when failed with an error * @returns {Promise<boolean>} True when succeeded, false when failed with an error
*/ */
async sendBatch({email, batch, post, newsletter}) { async sendBatch({email, batch: originalBatch, post, newsletter}) {
logging.info(`Sending batch ${batch.id} for email ${email.id}`); logging.info(`Sending batch ${originalBatch.id} for email ${email.id}`);
// Check the status of the email batch in a 'for update' transaction // Check the status of the email batch in a 'for update' transaction
batch = await this.updateStatusLock(this.#models.EmailBatch, batch.id, 'submitting', ['pending', 'failed']); const batch = await this.updateStatusLock(this.#models.EmailBatch, originalBatch.id, 'submitting', ['pending', 'failed']);
if (!batch) { if (!batch) {
logging.error(`Tried sending email batch that is not pending or failed ${batch.id}; status: ${batch.get('status')}`); logging.error(`Tried sending email batch that is not pending or failed ${originalBatch.id}`);
return true; return true;
} }

View File

@ -144,16 +144,29 @@ class MailgunEmailProvider {
return { return {
id: response.id.trim().replace(/^<|>$/g, '') id: response.id.trim().replace(/^<|>$/g, '')
}; };
} catch ({error, messageData}) { } catch (e) {
// REF: possible mailgun errors https://documentation.mailgun.com/en/latest/api-intro.html#status-codes let ghostError;
let ghostError = new errors.EmailError({ if (e.error && e.messageData) {
statusCode: error.status, const {error, messageData} = e;
message: this.#createMailgunErrorMessage(error),
errorDetails: JSON.stringify({error, messageData}), // REF: possible mailgun errors https://documentation.mailgun.com/en/latest/api-intro.html#status-codes
context: `Mailgun Error ${error.status}: ${error.details}`, ghostError = new errors.EmailError({
help: `https://ghost.org/docs/newsletters/#bulk-email-configuration`, statusCode: error.status,
code: 'BULK_EMAIL_SEND_FAILED' message: this.#createMailgunErrorMessage(error),
}); errorDetails: JSON.stringify({error, messageData}),
context: `Mailgun Error ${error.status}: ${error.details}`,
help: `https://ghost.org/docs/newsletters/#bulk-email-configuration`,
code: 'BULK_EMAIL_SEND_FAILED'
});
} else {
ghostError = new errors.EmailError({
statusCode: undefined,
message: e.message,
errorDetails: undefined,
context: e.context || 'Mailgun Error',
code: 'BULK_EMAIL_SEND_FAILED'
});
}
logging.warn(ghostError); logging.warn(ghostError);
debug(`failed to send message (${Date.now() - startTime}ms)`); debug(`failed to send message (${Date.now() - startTime}ms)`);
@ -163,6 +176,10 @@ class MailgunEmailProvider {
throw ghostError; throw ghostError;
} }
} }
getMaximumRecipients() {
return MailgunEmailProvider.BATCH_SIZE;
}
} }
module.exports = MailgunEmailProvider; module.exports = MailgunEmailProvider;

View File

@ -11,6 +11,7 @@
* *
* @typedef {object} IEmailProviderService * @typedef {object} IEmailProviderService
* @prop {(emailData: EmailData, options: EmailSendingOptions) => Promise<EmailProviderSuccessResponse>} send * @prop {(emailData: EmailData, options: EmailSendingOptions) => Promise<EmailProviderSuccessResponse>} send
* @prop {() => number} getMaximumRecipients
* *
* @typedef {object} Post * @typedef {object} Post
* @typedef {object} Newsletter * @typedef {object} Newsletter
@ -65,6 +66,10 @@ class SendingService {
this.#emailRenderer = emailRenderer; this.#emailRenderer = emailRenderer;
} }
getMaximumRecipients() {
return this.#emailProvider.getMaximumRecipients();
}
/** /**
* Send a given post, rendered for a given newsletter and segment to the members provided in the list * Send a given post, rendered for a given newsletter and segment to the members provided in the list
* @param {object} data * @param {object} data

View File

@ -61,7 +61,7 @@ jobsService.addOneOffJob({
// optionally await completion of the one-off job in case // optionally await completion of the one-off job in case
// there are state changes expected to execute the rest of the process // there are state changes expected to execute the rest of the process
await jobsService.awaitCompletion('members-migrations'); await jobsService.awaitOneOffCompletion('members-migrations');
// check if previously registered one-off job has been executed // check if previously registered one-off job has been executed
// successfully - it exists and doesn't have a "failed" state. // successfully - it exists and doesn't have a "failed" state.

View File

@ -29,6 +29,7 @@ const ALL_STATUSES = {
class JobManager { class JobManager {
#domainEvents; #domainEvents;
#completionPromises = new Map();
/** /**
* @param {Object} options * @param {Object} options
@ -94,24 +95,37 @@ class JobManager {
} }
async _jobMessageHandler({name, message}) { async _jobMessageHandler({name, message}) {
if (this._jobsRepository && name) { if (name) {
if (message === ALL_STATUSES.started) { if (message === ALL_STATUSES.started) {
const job = await this._jobsRepository.read(name); if (this._jobsRepository) {
const job = await this._jobsRepository.read(name);
if (job) { if (job) {
await this._jobsRepository.update(job.id, { await this._jobsRepository.update(job.id, {
status: ALL_STATUSES.started, status: ALL_STATUSES.started,
started_at: new Date() started_at: new Date()
}); });
}
} }
} else if (message === 'done') { } else if (message === 'done') {
const job = await this._jobsRepository.read(name); if (this._jobsRepository) {
const job = await this._jobsRepository.read(name);
if (job) { if (job) {
await this._jobsRepository.update(job.id, { await this._jobsRepository.update(job.id, {
status: ALL_STATUSES.finished, status: ALL_STATUSES.finished,
finished_at: new Date() finished_at: new Date()
}); });
}
}
// Check completion listeners
if (this.#completionPromises.has(name)) {
for (const listeners of this.#completionPromises.get(name)) {
listeners.resolve();
}
// Clear the listeners
this.#completionPromises.delete(name);
} }
} else { } else {
if (typeof message === 'object' && this.#domainEvents) { if (typeof message === 'object' && this.#domainEvents) {
@ -134,6 +148,15 @@ class JobManager {
}); });
} }
} }
// Check completion listeners and call them with error
if (this.#completionPromises.has(jobMeta.name)) {
for (const listeners of this.#completionPromises.get(jobMeta.name)) {
listeners.reject(error);
}
// Clear the listeners
this.#completionPromises.delete(jobMeta.name);
}
} }
/** /**
@ -280,12 +303,12 @@ class JobManager {
} }
/** /**
* Awaits completion of the offloaded job. * Awaits completion of the offloaded one-off job.
* CAUTION: it might take a long time to resolve! * CAUTION: it might take a long time to resolve!
* @param {String} name one-off job name * @param {String} name one-off job name
* @returns resolves with a Job model at current state * @returns resolves with a Job model at current state
*/ */
async awaitCompletion(name) { async awaitOneOffCompletion(name) {
const persistedJob = await this._jobsRepository.read({ const persistedJob = await this._jobsRepository.read({
name name
}); });
@ -294,12 +317,28 @@ class JobManager {
// NOTE: can implement exponential backoff here if that's ever needed // NOTE: can implement exponential backoff here if that's ever needed
await setTimeoutPromise(500); await setTimeoutPromise(500);
return this.awaitCompletion(name); return this.awaitOneOffCompletion(name);
} }
return persistedJob; return persistedJob;
} }
/***
* Create this promise before you add the job you want to listen for. Then await the returned promise.
* Resolves if the job has been executed successfully.
* Throws an error if the job has failed execution.
*/
async awaitCompletion(name) {
const promise = new Promise((resolve, reject) => {
this.#completionPromises.set(name, [
...(this.#completionPromises.get(name) ?? []),
{resolve, reject}
]);
});
return promise;
}
/** /**
* Removes an "offloaded" job from scheduled jobs queue. * Removes an "offloaded" job from scheduled jobs queue.
* It's NOT yet possible to remove "inline" jobs (will be possible when scheduling is added https://github.com/breejs/bree/issues/68). * It's NOT yet possible to remove "inline" jobs (will be possible when scheduling is added https://github.com/breejs/bree/issues/68).

View File

@ -37,7 +37,7 @@ describe('Job Manager', function () {
should.exist(jobManager.addJob); should.exist(jobManager.addJob);
should.exist(jobManager.hasExecutedSuccessfully); should.exist(jobManager.hasExecutedSuccessfully);
should.exist(jobManager.awaitCompletion); should.exist(jobManager.awaitOneOffCompletion);
}); });
describe('Add a job', function () { describe('Add a job', function () {
@ -641,7 +641,7 @@ describe('Job Manager', function () {
}); });
should.equal(spy.called, false); should.equal(spy.called, false);
await jobManager.awaitCompletion('solovei'); await jobManager.awaitOneOffCompletion('solovei');
should.equal(spy.called, true); should.equal(spy.called, true);
}); });
}); });