diff --git a/ghost/core/core/server/services/email-analytics/index.js b/ghost/core/core/server/services/email-analytics/index.js index 90c6cd247f..59587420f2 100644 --- a/ghost/core/core/server/services/email-analytics/index.js +++ b/ghost/core/core/server/services/email-analytics/index.js @@ -2,14 +2,20 @@ const config = require('../../../shared/config'); const db = require('../../data/db'); const settings = require('../../../shared/settings-cache'); const {EmailAnalyticsService} = require('@tryghost/email-analytics-service'); -const EventProcessor = require('./lib/event-processor'); +const {EmailEventProcessor} = require('@tryghost/email-service'); const MailgunProvider = require('@tryghost/email-analytics-provider-mailgun'); const queries = require('./lib/queries'); +const DomainEvents = require('@tryghost/domain-events'); + +const eventProcessor = new EmailEventProcessor({ + domainEvents: DomainEvents, + db +}); module.exports = new EmailAnalyticsService({ config, settings, - eventProcessor: new EventProcessor({db}), + eventProcessor, providers: [ new MailgunProvider({config, settings}) ], diff --git a/ghost/core/core/server/services/email-analytics/jobs/fetch-latest/index.js b/ghost/core/core/server/services/email-analytics/jobs/fetch-latest/index.js new file mode 100644 index 0000000000..a31ca955fb --- /dev/null +++ b/ghost/core/core/server/services/email-analytics/jobs/fetch-latest/index.js @@ -0,0 +1,50 @@ +const {parentPort} = require('worker_threads'); + +// recurring job to fetch analytics since the most recently seen event timestamp + +// Exit early when cancelled to prevent stalling shutdown. No cleanup needed when cancelling as everything is idempotent and will pick up +// where it left off on next run +function cancel() { + if (parentPort) { + parentPort.postMessage('Email analytics fetch-latest job cancelled before completion'); + parentPort.postMessage('cancelled'); + } else { + setTimeout(() => { + process.exit(0); + }, 1000); + } +} + +if (parentPort) { + parentPort.once('message', (message) => { + if (message === 'cancel') { + return cancel(); + } + }); +} + +(async () => { + const {run} = require('./run'); + const {eventStats, aggregateEndDate, fetchStartDate} = await run({ + domainEvents: { + dispatch(event) { + parentPort.postMessage({ + event: { + type: event.constructor.name, + data: event + } + }); + } + } + }); + + if (parentPort) { + parentPort.postMessage(`Fetched ${eventStats.totalEvents} events and aggregated stats for ${eventStats.emailIds.length} emails in ${aggregateEndDate - fetchStartDate}ms`); + parentPort.postMessage('done'); + } else { + // give the logging pipes time finish writing before exit + setTimeout(() => { + process.exit(0); + }, 1000); + } +})(); diff --git a/ghost/core/core/server/services/email-analytics/jobs/fetch-latest.js b/ghost/core/core/server/services/email-analytics/jobs/fetch-latest/run.js similarity index 51% rename from ghost/core/core/server/services/email-analytics/jobs/fetch-latest.js rename to ghost/core/core/server/services/email-analytics/jobs/fetch-latest/run.js index 447657c578..e646264251 100644 --- a/ghost/core/core/server/services/email-analytics/jobs/fetch-latest.js +++ b/ghost/core/core/server/services/email-analytics/jobs/fetch-latest/run.js @@ -1,32 +1,8 @@ -const {parentPort} = require('worker_threads'); const debug = require('@tryghost/debug')('jobs:email-analytics:fetch-latest'); -// recurring job to fetch analytics since the most recently seen event timestamp - -// Exit early when cancelled to prevent stalling shutdown. No cleanup needed when cancelling as everything is idempotent and will pick up -// where it left off on next run -function cancel() { - if (parentPort) { - parentPort.postMessage('Email analytics fetch-latest job cancelled before completion'); - parentPort.postMessage('cancelled'); - } else { - setTimeout(() => { - process.exit(0); - }, 1000); - } -} - -if (parentPort) { - parentPort.once('message', (message) => { - if (message === 'cancel') { - return cancel(); - } - }); -} - -(async () => { - const config = require('../../../../shared/config'); - const db = require('../../../data/db'); +async function run({domainEvents}) { + const config = require('../../../../../shared/config'); + const db = require('../../../../data/db'); const settingsRows = await db.knex('settings') .whereIn('key', ['mailgun_api_key', 'mailgun_domain', 'mailgun_base_url']); @@ -44,14 +20,21 @@ if (parentPort) { }; const {EmailAnalyticsService} = require('@tryghost/email-analytics-service'); - const EventProcessor = require('../lib/event-processor'); const MailgunProvider = require('@tryghost/email-analytics-provider-mailgun'); - const queries = require('../lib/queries'); + const queries = require('../../lib/queries'); + const {EmailEventProcessor} = require('@tryghost/email-service'); + + // Since this is running in a worker thread, we cant dispatch directly + // So we post the events as a message to the job manager + const eventProcessor = new EmailEventProcessor({ + domainEvents, + db + }); const emailAnalyticsService = new EmailAnalyticsService({ config, settings, - eventProcessor: new EventProcessor({db}), + eventProcessor, providers: [ new MailgunProvider({config, settings}) ], @@ -69,14 +52,6 @@ if (parentPort) { await emailAnalyticsService.aggregateStats(eventStats); const aggregateEndDate = new Date(); debug(`Finished aggregating email analytics in ${aggregateEndDate - aggregateStartDate}ms`); - - if (parentPort) { - parentPort.postMessage(`Fetched ${eventStats.totalEvents} events and aggregated stats for ${eventStats.emailIds.length} emails in ${aggregateEndDate - fetchStartDate}ms`); - parentPort.postMessage('done'); - } else { - // give the logging pipes time finish writing before exit - setTimeout(() => { - process.exit(0); - }, 1000); - } -})(); + return {eventStats, fetchStartDate, fetchEndDate, aggregateStartDate, aggregateEndDate}; +} +module.exports.run = run; diff --git a/ghost/core/core/server/services/email-analytics/jobs/index.js b/ghost/core/core/server/services/email-analytics/jobs/index.js index 79705097a3..d067667703 100644 --- a/ghost/core/core/server/services/email-analytics/jobs/index.js +++ b/ghost/core/core/server/services/email-analytics/jobs/index.js @@ -30,7 +30,7 @@ module.exports = { jobsService.addJob({ at: `${s} ${m}/5 * * * *`, - job: path.resolve(__dirname, 'fetch-latest.js'), + job: path.resolve(__dirname, 'fetch-latest/index.js'), name: 'email-analytics-fetch-latest' }); diff --git a/ghost/core/core/server/services/email-analytics/lib/event-processor.js b/ghost/core/core/server/services/email-analytics/lib/event-processor.js deleted file mode 100644 index 5766544ecc..0000000000 --- a/ghost/core/core/server/services/email-analytics/lib/event-processor.js +++ /dev/null @@ -1,178 +0,0 @@ -const {EventProcessor} = require('@tryghost/email-analytics-service'); -const {default: ObjectID} = require('bson-objectid'); -const moment = require('moment-timezone'); - -class GhostEventProcessor extends EventProcessor { - constructor({db}) { - super(...arguments); - - this.db = db; - - // avoid having to query email_batch by provider_id for every event - this.providerIdEmailIdMap = {}; - } - - async getEmailId(event) { - if (event.emailId) { - return event.emailId; - } - - if (event.providerId) { - if (this.providerIdEmailIdMap[event.providerId]) { - return this.providerIdEmailIdMap[event.providerId]; - } - - const {emailId} = await this.db.knex('email_batches') - .select('email_id as emailId') - .where('provider_id', event.providerId) - .first() || {}; - - if (!emailId) { - return; - } - - this.providerIdEmailIdMap[event.providerId] = emailId; - return emailId; - } - - return undefined; - } - - async getMemberId(event) { - const emailId = await this.getEmailId(event); - - if (!emailId) { - return false; - } - - if (emailId && event.recipientEmail) { - const {memberId} = await this.db.knex('email_recipients') - .select('member_id as memberId') - .where('member_email', event.recipientEmail) - .where('email_id', emailId) - .first() || {}; - - return memberId; - } - - return undefined; - } - - async handleDelivered(event) { - const emailId = await this.getEmailId(event); - - if (!emailId) { - return false; - } - - const updateResult = await this.db.knex('email_recipients') - .where('email_id', '=', emailId) - .where('member_email', '=', event.recipientEmail) - .update({ - delivered_at: this.db.knex.raw('COALESCE(delivered_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')]) - }); - - return updateResult > 0; - } - - async handleOpened(event) { - const emailId = await this.getEmailId(event); - - if (!emailId) { - return false; - } - - const updateResult = await this.db.knex('email_recipients') - .where('email_id', '=', emailId) - .where('member_email', '=', event.recipientEmail) - .update({ - opened_at: this.db.knex.raw('COALESCE(opened_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')]) - }); - - // Using the default timezone set in https://github.com/TryGhost/Ghost/blob/2c5643623db0fc4db390f6997c81a73dca7ccacd/core/server/data/schema/default-settings/default-settings.json#L105 - let timezone = 'Etc/UTC'; - const timezoneData = await this.db.knex('settings').first('value').where('key', 'timezone'); - if (timezoneData && timezoneData.value) { - timezone = timezoneData.value; - } - - await this.db.knex('members') - .where('email', '=', event.recipientEmail) - .andWhere(builder => builder - .where('last_seen_at', '<', moment.utc(event.timestamp).tz(timezone).startOf('day').utc().format('YYYY-MM-DD HH:mm:ss')) - .orWhereNull('last_seen_at') - ) - .update({ - last_seen_at: moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss') - }); - - return updateResult > 0; - } - - async handleTemporaryFailed(/*event*/) { - // noop - we don't do anything with temporary failures for now - } - - async handlePermanentFailed(event) { - const emailId = await this.getEmailId(event); - - if (!emailId) { - return false; - } - - const updateResult = await this.db.knex('email_recipients') - .where('email_id', '=', emailId) - .where('member_email', '=', event.recipientEmail) - .update({ - failed_at: this.db.knex.raw('COALESCE(failed_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')]) - }); - - return updateResult > 0; - } - - async handleUnsubscribed(event) { - return this._unsubscribeFromNewsletters(event); - } - - async handleComplained(event) { - return this._unsubscribeFromNewsletters(event); - } - - async _unsubscribeFromNewsletters(event) { - const memberId = await this.getMemberId(event); - - if (!memberId) { - return false; - } - - const subscribedNewsletterIds = await this.db.knex('members_newsletters') - .where('member_id', '=', memberId) - .pluck('newsletter_id'); - - await this.db.knex('members_newsletters') - .where('member_id', '=', memberId) - .del(); - - const nowUTC = moment.utc().toDate(); - for (const newsletterId of subscribedNewsletterIds) { - await this.db.knex('members_subscribe_events').insert({ - id: ObjectID().toHexString(), - member_id: memberId, - newsletter_id: newsletterId, - subscribed: false, - created_at: nowUTC, - source: 'member' - }); - } - - const updateResult = await this.db.knex('members') - .where('id', '=', memberId) - .update({ - updated_at: moment.utc().toDate() - }); - - return updateResult > 0; - } -} - -module.exports = GhostEventProcessor; diff --git a/ghost/core/core/server/services/email-service/wrapper.js b/ghost/core/core/server/services/email-service/wrapper.js index 47a3b4e63c..f2ec7e388f 100644 --- a/ghost/core/core/server/services/email-service/wrapper.js +++ b/ghost/core/core/server/services/email-service/wrapper.js @@ -3,7 +3,11 @@ const ObjectID = require('bson-objectid').default; class EmailServiceWrapper { init() { - const {EmailService, EmailController, EmailRenderer, SendingService, BatchSendingService, EmailSegmenter} = require('@tryghost/email-service'); + if (this.service) { + return; + } + + const {EmailService, EmailController, EmailRenderer, SendingService, BatchSendingService, EmailSegmenter, EmailEventStorage} = require('@tryghost/email-service'); const {Post, Newsletter, Email, EmailBatch, EmailRecipient, Member} = require('../../models'); const settingsCache = require('../../../shared/settings-cache'); const jobsService = require('../jobs'); @@ -11,6 +15,7 @@ class EmailServiceWrapper { const db = require('../../data/db'); const membersRepository = membersService.api.members; const limitService = require('../limits'); + const domainEvents = require('@tryghost/domain-events'); const emailRenderer = new EmailRenderer(); const sendingService = new SendingService({ @@ -58,6 +63,12 @@ class EmailServiceWrapper { Email } }); + + this.eventStorage = new EmailEventStorage({ + db, + membersRepository + }); + this.eventStorage.listen(domainEvents); } } diff --git a/ghost/core/core/server/services/jobs/job-service.js b/ghost/core/core/server/services/jobs/job-service.js index 4676e6f0f0..3b8298e73c 100644 --- a/ghost/core/core/server/services/jobs/job-service.js +++ b/ghost/core/core/server/services/jobs/job-service.js @@ -7,6 +7,7 @@ const JobManager = require('@tryghost/job-manager'); const logging = require('@tryghost/logging'); const models = require('../../models'); const sentry = require('../../../shared/sentry'); +const domainEvents = require('@tryghost/domain-events'); const errorHandler = (error, workerMeta) => { logging.info(`Capturing error for worker during execution of job: ${workerMeta.name}`); @@ -15,7 +16,9 @@ const errorHandler = (error, workerMeta) => { }; const workerMessageHandler = ({name, message}) => { - logging.info(`Worker for job ${name} sent a message: ${message}`); + if (typeof message === 'string') { + logging.info(`Worker for job ${name} sent a message: ${message}`); + } }; const initTestMode = () => { @@ -39,7 +42,7 @@ const initTestMode = () => { }, 5000); }; -const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job}); +const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents}); module.exports = jobManager; module.exports.initTestMode = initTestMode; diff --git a/ghost/core/core/server/services/members-events/index.js b/ghost/core/core/server/services/members-events/index.js index 69f1774fa6..156f4132ad 100644 --- a/ghost/core/core/server/services/members-events/index.js +++ b/ghost/core/core/server/services/members-events/index.js @@ -27,7 +27,7 @@ class MembersEventsServiceWrapper { services: { settingsCache }, - async getMembersApi() { + getMembersApi() { return members.api; } }); diff --git a/ghost/core/test/integration/services/email-service/email-event-storage.test.js b/ghost/core/test/integration/services/email-service/email-event-storage.test.js new file mode 100644 index 0000000000..9fa13af25d --- /dev/null +++ b/ghost/core/test/integration/services/email-service/email-event-storage.test.js @@ -0,0 +1,438 @@ +const sinon = require('sinon'); +const {agentProvider, fixtureManager, mockManager} = require('../../../utils/e2e-framework'); +const assert = require('assert'); +const models = require('../../../../core/server/models'); +const domainEvents = require('@tryghost/domain-events'); +const MailgunClient = require('@tryghost/mailgun-client'); +const {run} = require('../../../../core/server/services/email-analytics/jobs/fetch-latest/run.js'); +const membersService = require('../../../../core/server/services/members'); + +async function sleep(ms) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +// Test the whole E2E flow from Mailgun events -> handling and storage +describe('EmailEventStorage', function () { + let _mailgunClient; + let agent; + let events = []; + let jobsService; + + before(async function () { + agent = await agentProvider.getAdminAPIAgent(); + await fixtureManager.init('newsletters', 'members:newsletters', 'members:emails'); + await agent.loginAsOwner(); + + // Only create reference to jobsService after Ghost boot + jobsService = require('../../../../core/server/services/jobs'); + + sinon.stub(MailgunClient.prototype, 'fetchEvents').callsFake(async function (_, batchHandler) { + const normalizedEvents = events.map(this.normalizeEvent) || []; + return [await batchHandler(normalizedEvents)]; + }); + }); + + it('Can handle delivered events', async function () { + const emailBatch = fixtureManager.get('email_batches', 0); + const emailId = emailBatch.email_id; + + const emailRecipient = fixtureManager.get('email_recipients', 0); + assert(emailRecipient.batch_id === emailBatch.id); + const memberId = emailRecipient.member_id; + const providerId = emailBatch.provider_id; + const timestamp = new Date(2000, 0, 1); + + events = [{ + event: 'delivered', + recipient: emailRecipient.member_email, + 'user-variables': { + 'email-id': emailId + }, + message: { + headers: { + 'message-id': providerId + } + }, + // unix timestamp + timestamp: Math.round(timestamp.getTime() / 1000) + }]; + + const initialModel = await models.EmailRecipient.findOne({ + id: emailRecipient.id + }, {require: true}); + + assert.equal(initialModel.get('delivered_at'), null); + + // Fire event processing + // We use offloading to have correct coverage and usage of worker thread + const {eventStats: result} = await run({ + domainEvents + }); + assert.equal(result.delivered, 1); + assert.deepEqual(result.emailIds, [emailId]); + assert.deepEqual(result.memberIds, [memberId]); + + // Now wait for events processed + await sleep(100); + + // Check if status has changed to delivered, with correct timestamp + const updatedEmailRecipient = await models.EmailRecipient.findOne({ + id: emailRecipient.id + }, {require: true}); + + assert.equal(updatedEmailRecipient.get('delivered_at').toUTCString(), timestamp.toUTCString()); + }); + + it('Can handle delivered events without user variables', async function () { + const emailBatch = fixtureManager.get('email_batches', 0); + const emailId = emailBatch.email_id; + + const emailRecipient = fixtureManager.get('email_recipients', 0); + assert(emailRecipient.batch_id === emailBatch.id); + const memberId = emailRecipient.member_id; + const providerId = emailBatch.provider_id; + const timestamp = new Date(2000, 0, 1); + + // Reset + await models.EmailRecipient.edit({delivered_at: null}, { + id: emailRecipient.id + }); + + events = [{ + event: 'delivered', + recipient: emailRecipient.member_email, + 'user-variables': {}, + message: { + headers: { + 'message-id': providerId + } + }, + // unix timestamp + timestamp: Math.round(timestamp.getTime() / 1000) + }]; + + const initialModel = await models.EmailRecipient.findOne({ + id: emailRecipient.id + }, {require: true}); + + assert.equal(initialModel.get('delivered_at'), null); + + // Fire event processing + // We use offloading to have correct coverage and usage of worker thread + const {eventStats: result} = await run({ + domainEvents + }); + assert.equal(result.delivered, 1); + assert.deepEqual(result.emailIds, [emailId]); + assert.deepEqual(result.memberIds, [memberId]); + + // Now wait for events processed + await sleep(100); + + // Check if status has changed to delivered, with correct timestamp + const updatedEmailRecipient = await models.EmailRecipient.findOne({ + id: emailRecipient.id + }, {require: true}); + + assert.equal(updatedEmailRecipient.get('delivered_at').toUTCString(), timestamp.toUTCString()); + }); + + it('Can handle opened events', async function () { + const emailBatch = fixtureManager.get('email_batches', 0); + const emailId = emailBatch.email_id; + + const emailRecipient = fixtureManager.get('email_recipients', 0); + assert(emailRecipient.batch_id === emailBatch.id); + const memberId = emailRecipient.member_id; + const providerId = emailBatch.provider_id; + const timestamp = new Date(2000, 0, 1); + + events = [{ + event: 'opened', + recipient: emailRecipient.member_email, + 'user-variables': { + 'email-id': emailId + }, + message: { + headers: { + 'message-id': providerId + } + }, + // unix timestamp + timestamp: Math.round(timestamp.getTime() / 1000) + }]; + + const initialModel = await models.EmailRecipient.findOne({ + id: emailRecipient.id + }, {require: true}); + + assert.equal(initialModel.get('opened_at'), null); + + // Fire event processing + // We use offloading to have correct coverage and usage of worker thread + const {eventStats: result} = await run({ + domainEvents + }); + assert.equal(result.opened, 1); + assert.deepEqual(result.emailIds, [emailId]); + assert.deepEqual(result.memberIds, [memberId]); + + // Now wait for events processed + await sleep(100); + + // Check if status has changed to delivered, with correct timestamp + const updatedEmailRecipient = await models.EmailRecipient.findOne({ + id: emailRecipient.id + }, {require: true}); + + assert.equal(updatedEmailRecipient.get('opened_at').toUTCString(), timestamp.toUTCString()); + + // TODO: check last seen at + }); + + it('Can handle permanent failure events', async function () { + const emailBatch = fixtureManager.get('email_batches', 0); + const emailId = emailBatch.email_id; + + const emailRecipient = fixtureManager.get('email_recipients', 0); + assert(emailRecipient.batch_id === emailBatch.id); + const memberId = emailRecipient.member_id; + const providerId = emailBatch.provider_id; + const timestamp = new Date(2000, 0, 1); + + events = [{ + event: 'failed', + severity: 'permanent', + recipient: emailRecipient.member_email, + 'user-variables': { + 'email-id': emailId + }, + message: { + headers: { + 'message-id': providerId + } + }, + // unix timestamp + timestamp: Math.round(timestamp.getTime() / 1000) + }]; + + const initialModel = await models.EmailRecipient.findOne({ + id: emailRecipient.id + }, {require: true}); + + assert.equal(initialModel.get('failed_at'), null); + + // Fire event processing + // We use offloading to have correct coverage and usage of worker thread + const {eventStats: result} = await run({ + domainEvents + }); + assert.equal(result.permanentFailed, 1); + assert.deepEqual(result.emailIds, [emailId]); + assert.deepEqual(result.memberIds, [memberId]); + + // Now wait for events processed + await sleep(200); + + // Check if status has changed to delivered, with correct timestamp + const updatedEmailRecipient = await models.EmailRecipient.findOne({ + id: emailRecipient.id + }, {require: true}); + + assert.equal(updatedEmailRecipient.get('failed_at').toUTCString(), timestamp.toUTCString()); + }); + + it('Can handle tempoary failure events', async function () { + const emailBatch = fixtureManager.get('email_batches', 0); + const emailId = emailBatch.email_id; + + const emailRecipient = fixtureManager.get('email_recipients', 0); + assert(emailRecipient.batch_id === emailBatch.id); + const memberId = emailRecipient.member_id; + const providerId = emailBatch.provider_id; + const timestamp = new Date(2000, 0, 1); + + // Reset + await models.EmailRecipient.edit({failed_at: null}, { + id: emailRecipient.id + }); + + events = [{ + event: 'failed', + severity: 'temporary', + recipient: emailRecipient.member_email, + 'user-variables': { + 'email-id': emailId + }, + message: { + headers: { + 'message-id': providerId + } + }, + // unix timestamp + timestamp: Math.round(timestamp.getTime() / 1000) + }]; + + const initialModel = await models.EmailRecipient.findOne({ + id: emailRecipient.id + }, {require: true}); + + assert.equal(initialModel.get('failed_at'), null); + + // Fire event processing + // We use offloading to have correct coverage and usage of worker thread + const {eventStats: result} = await run({ + domainEvents + }); + assert.equal(result.temporaryFailed, 1); + assert.deepEqual(result.emailIds, [emailId]); + assert.deepEqual(result.memberIds, [memberId]); + + // Now wait for events processed + await sleep(200); + + // Check if status has changed to delivered, with correct timestamp + const updatedEmailRecipient = await models.EmailRecipient.findOne({ + id: emailRecipient.id + }, {require: true}); + + // Not mark as failed + assert.equal(initialModel.get('failed_at'), null); + }); + + it('Can handle complaint events', async function () { + const emailBatch = fixtureManager.get('email_batches', 0); + const emailId = emailBatch.email_id; + + const emailRecipient = fixtureManager.get('email_recipients', 0); + assert(emailRecipient.batch_id === emailBatch.id); + const memberId = emailRecipient.member_id; + const providerId = emailBatch.provider_id; + const timestamp = new Date(2000, 0, 1); + + // Check not unsubscribed + const memberInitial = await membersService.api.members.get({id: memberId}, {withRelated: ['newsletters']}); + assert.notEqual(memberInitial.related('newsletters').length, 0, 'This test requires a member that is subscribed to at least one newsletter'); + + events = [{ + event: 'complained', + recipient: emailRecipient.member_email, + 'user-variables': { + 'email-id': emailId + }, + message: { + headers: { + 'message-id': providerId + } + }, + // unix timestamp + timestamp: Math.round(timestamp.getTime() / 1000) + }]; + + // Fire event processing + // We use offloading to have correct coverage and usage of worker thread + const {eventStats: result} = await run({ + domainEvents + }); + assert.equal(result.complained, 1); + assert.deepEqual(result.emailIds, [emailId]); + assert.deepEqual(result.memberIds, [memberId]); + + // Now wait for events processed + await sleep(200); + + // Check if unsubscribed + const member = await membersService.api.members.get({id: memberId}, {withRelated: ['newsletters']}); + assert.equal(member.related('newsletters').length, 0); + }); + + it('Can handle unsubscribe events', async function () { + const emailBatch = fixtureManager.get('email_batches', 0); + const emailId = emailBatch.email_id; + + const emailRecipient = fixtureManager.get('email_recipients', 0); + assert(emailRecipient.batch_id === emailBatch.id); + const memberId = emailRecipient.member_id; + const providerId = emailBatch.provider_id; + const timestamp = new Date(2000, 0, 1); + + // Reset + await membersService.api.members.update({newsletters: [ + { + id: fixtureManager.get('newsletters', 0).id + } + ]}, {id: memberId}); + + // Check not unsubscribed + const memberInitial = await membersService.api.members.get({id: memberId}, {withRelated: ['newsletters']}); + assert.notEqual(memberInitial.related('newsletters').length, 0, 'This test requires a member that is subscribed to at least one newsletter'); + + events = [{ + event: 'unsubscribed', + recipient: emailRecipient.member_email, + 'user-variables': { + 'email-id': emailId + }, + message: { + headers: { + 'message-id': providerId + } + }, + // unix timestamp + timestamp: Math.round(timestamp.getTime() / 1000) + }]; + + // Fire event processing + // We use offloading to have correct coverage and usage of worker thread + const {eventStats: result} = await run({ + domainEvents + }); + assert.equal(result.unsubscribed, 1); + assert.deepEqual(result.emailIds, [emailId]); + assert.deepEqual(result.memberIds, [memberId]); + + // Now wait for events processed + await sleep(200); + + // Check if unsubscribed + const member = await membersService.api.members.get({id: memberId}, {withRelated: ['newsletters']}); + assert.equal(member.related('newsletters').length, 0); + }); + + it('Can handle unknown events', async function () { + const emailBatch = fixtureManager.get('email_batches', 0); + const emailId = emailBatch.email_id; + + const emailRecipient = fixtureManager.get('email_recipients', 0); + assert(emailRecipient.batch_id === emailBatch.id); + const memberId = emailRecipient.member_id; + const providerId = emailBatch.provider_id; + const timestamp = new Date(2000, 0, 1); + + events = [{ + event: 'ceci-nest-pas-un-event', + recipient: emailRecipient.member_email, + 'user-variables': { + 'email-id': emailId + }, + message: { + headers: { + 'message-id': providerId + } + }, + // unix timestamp + timestamp: Math.round(timestamp.getTime() / 1000) + }]; + + // Fire event processing + // We use offloading to have correct coverage and usage of worker thread + const {eventStats: result} = await run({ + domainEvents + }); + assert.equal(result.unhandled, 1); + assert.deepEqual(result.emailIds, []); + assert.deepEqual(result.memberIds, []); + }); +}); diff --git a/ghost/domain-events/lib/DomainEvents.js b/ghost/domain-events/lib/DomainEvents.js index 6c74bdb14b..892f6a3deb 100644 --- a/ghost/domain-events/lib/DomainEvents.js +++ b/ghost/domain-events/lib/DomainEvents.js @@ -37,7 +37,18 @@ class DomainEvents { * @returns {void} */ static dispatch(event) { - DomainEvents.ee.emit(event.constructor.name, event); + DomainEvents.dispatchRaw(event.constructor.name, event); + } + + /** + * Dispatch an event in case you don't have an instance of the event class, but you do have the event name and event data. + * @template Data + * @param {string} name + * @param {Data} data + * @returns {void} + */ + static dispatchRaw(name, data) { + DomainEvents.ee.emit(name, data); } } diff --git a/ghost/email-analytics-service/index.js b/ghost/email-analytics-service/index.js index f0d65d7e09..d592a9f520 100644 --- a/ghost/email-analytics-service/index.js +++ b/ghost/email-analytics-service/index.js @@ -1,5 +1,4 @@ module.exports = { EmailAnalyticsService: require('./lib/email-analytics-service'), - EventProcessingResult: require('./lib/event-processing-result'), - EventProcessor: require('./lib/event-processor') + EventProcessingResult: require('./lib/event-processing-result') }; diff --git a/ghost/email-analytics-service/lib/email-analytics-service.js b/ghost/email-analytics-service/lib/email-analytics-service.js index 8f625e5997..ce8889b611 100644 --- a/ghost/email-analytics-service/lib/email-analytics-service.js +++ b/ghost/email-analytics-service/lib/email-analytics-service.js @@ -1,8 +1,22 @@ const EventProcessingResult = require('./event-processing-result'); const debug = require('@tryghost/debug')('services:email-analytics'); +/** + * @typedef {import('@tryghost/email-service').EmailEventProcessor} EmailEventProcessor + */ + module.exports = class EmailAnalyticsService { - constructor({config, settings, queries, eventProcessor, providers} = {}) { + config; + settings; + queries; + eventProcessor; + providers; + + /** + * @param {object} dependencies + * @param {EmailEventProcessor} dependencies.eventProcessor + */ + constructor({config, settings, queries, eventProcessor, providers}) { this.config = config; this.settings = settings; this.queries = queries; @@ -61,13 +75,106 @@ module.exports = class EmailAnalyticsService { const result = new EventProcessingResult(); for (const event of events) { - const batchResult = await this.eventProcessor.process(event); + const batchResult = await this.processEvent(event); result.merge(batchResult); } return result; } + /** + * + * @param {{type: any; severity: any; recipientEmail: any; emailId: any; providerId: string; timestamp: Date;}} event + * @returns {Promise} + */ + async processEvent(event) { + if (event.type === 'delivered') { + const recipient = await this.eventProcessor.handleDelivered({emailId: event.emailId, providerId: event.providerId, email: event.recipientEmail}, event.timestamp); + + if (recipient) { + return new EventProcessingResult({ + delivered: 1, + emailIds: [recipient.emailId], + memberIds: [recipient.memberId] + }); + } + + return new EventProcessingResult({unprocessable: 1}); + } + + if (event.type === 'opened') { + const recipient = await this.eventProcessor.handleOpened({emailId: event.emailId, providerId: event.providerId, email: event.recipientEmail}, event.timestamp); + + if (recipient) { + return new EventProcessingResult({ + opened: 1, + emailIds: [recipient.emailId], + memberIds: [recipient.memberId] + }); + } + + return new EventProcessingResult({unprocessable: 1}); + } + + if (event.type === 'failed') { + if (event.severity === 'permanent') { + const recipient = await this.eventProcessor.handlePermanentFailed({emailId: event.emailId, providerId: event.providerId, email: event.recipientEmail}, event.timestamp); + + if (recipient) { + return new EventProcessingResult({ + permanentFailed: 1, + emailIds: [recipient.emailId], + memberIds: [recipient.memberId] + }); + } + + return new EventProcessingResult({unprocessable: 1}); + } else { + const recipient = await this.eventProcessor.handleTemporaryFailed({emailId: event.emailId, providerId: event.providerId, email: event.recipientEmail}, event.timestamp); + + if (recipient) { + return new EventProcessingResult({ + temporaryFailed: 1, + emailIds: [recipient.emailId], + memberIds: [recipient.memberId] + }); + } + + return new EventProcessingResult({unprocessable: 1}); + } + } + + if (event.type === 'unsubscribed') { + const recipient = await this.eventProcessor.handleUnsubscribed({emailId: event.emailId, providerId: event.providerId, email: event.recipientEmail}, event.timestamp); + + if (recipient) { + return new EventProcessingResult({ + unsubscribed: 1, + emailIds: [recipient.emailId], + memberIds: [recipient.memberId] + }); + } + + return new EventProcessingResult({unprocessable: 1}); + } + + if (event.type === 'complained') { + const recipient = await this.eventProcessor.handleComplained({emailId: event.emailId, providerId: event.providerId, email: event.recipientEmail}, event.timestamp); + + if (recipient) { + return new EventProcessingResult({ + complained: 1, + emailIds: [recipient.emailId], + memberIds: [recipient.memberId] + }); + } + + return new EventProcessingResult({unprocessable: 1}); + } + + return new EventProcessingResult({unhandled: 1}); + } + async aggregateStats({emailIds = [], memberIds = []}) { for (const emailId of emailIds) { await this.aggregateEmailStats(emailId); diff --git a/ghost/email-analytics-service/lib/event-processing-result.js b/ghost/email-analytics-service/lib/event-processing-result.js index b87af6fb0b..f1e0e0e896 100644 --- a/ghost/email-analytics-service/lib/event-processing-result.js +++ b/ghost/email-analytics-service/lib/event-processing-result.js @@ -1,6 +1,20 @@ const _ = require('lodash'); class EventProcessingResult { + /** + * @param {object} result + * @param {number} [result.delivered] + * @param {number} [result.opened] + * @param {number} [result.temporaryFailed] + * @param {number} [result.permanentFailed] + * @param {number} [result.unsubscribed] + * @param {number} [result.complained] + * @param {number} [result.unhandled] + * @param {number} [result.unprocessable] + * @param {number} [result.processingFailures] + * @param {string[]} [result.emailIds] + * @param {string[]} [result.memberIds] + */ constructor(result = {}) { // counts this.delivered = 0; diff --git a/ghost/email-analytics-service/lib/event-processor.js b/ghost/email-analytics-service/lib/event-processor.js deleted file mode 100644 index 3cf4908ab0..0000000000 --- a/ghost/email-analytics-service/lib/event-processor.js +++ /dev/null @@ -1,210 +0,0 @@ -module.exports = class EventProcessor { - constructor() { - } - - // override these in a sub-class to define app-specific behavior - - async getEmailId(/*event*/) { - return undefined; - } - - async getMemberId(/*event*/) { - return undefined; - } - - async handleDelivered(/*event*/) { - return false; - } - - async handleOpened(/*event*/) { - return false; - } - - async handleTemporaryFailed(/*event*/) { - return false; - } - - async handlePermanentFailed(/*event*/) { - return false; - } - - async handleUnsubscribed(/*event*/) { - return false; - } - - async handleComplained(/*event*/) { - return false; - } - - // superclass functionality ------------------------------------------------ - - async process(event) { - if (event.type === 'delivered') { - return this._handleDelivered(event); - } - - if (event.type === 'opened') { - return this._handleOpened(event); - } - - if (event.type === 'failed') { - if (event.severity === 'permanent') { - return this._handlePermanentFailed(event); - } else { - return this._handleTemporaryFailed(event); - } - } - - if (event.type === 'unsubscribed') { - return this._handleUnsubscribed(event); - } - - if (event.type === 'complained') { - return this._handleComplained(event); - } - - return { - unhandled: 1 - }; - } - - async _handleDelivered(event) { - const emailId = await this._getEmailId(event); - - if (!emailId) { - return {unprocessable: 1}; - } - - const handlerSuccess = await this.handleDelivered(event); - - if (handlerSuccess) { - const memberId = await this._getMemberId(event); - - return { - delivered: 1, - emailIds: [emailId], - memberIds: [memberId] - }; - } - - return {unprocessable: 1}; - } - - async _handleOpened(event) { - const emailId = await this._getEmailId(event); - - if (!emailId) { - return {unprocessable: 1}; - } - - const handlerSuccess = await this.handleOpened(event); - - if (handlerSuccess) { - const memberId = await this._getMemberId(event); - - return { - opened: 1, - emailIds: [emailId], - memberIds: [memberId] - }; - } - - return {unprocessable: 1}; - } - - async _handlePermanentFailed(event) { - const emailId = await this._getEmailId(event); - - if (!emailId) { - return {unprocessable: 1}; - } - - const handlerSuccess = await this.handlePermanentFailed(event); - - if (handlerSuccess) { - return { - permanentFailed: 1, - emailIds: [emailId] - }; - } - - return {unprocessable: 1}; - } - - async _handleTemporaryFailed(event) { - const emailId = await this._getEmailId(event); - - if (!emailId) { - return {unprocessable: 1}; - } - - const handlerSuccess = await this.handleTemporaryFailed(event); - - if (handlerSuccess) { - return { - temporaryFailed: 1, - emailIds: [emailId] - }; - } - - return {unprocessable: 1}; - } - - async _handleUnsubscribed(event) { - const emailId = await this._getEmailId(event); - - if (!emailId) { - return {unprocessable: 1}; - } - - const handlerSuccess = await this.handleUnsubscribed(event); - - if (handlerSuccess) { - return { - unsubscribed: 1, - emailIds: [emailId] - }; - } - - return { - unprocessable: 1 - }; - } - - async _handleComplained(event) { - const emailId = await this._getEmailId(event); - - if (!emailId) { - return {unprocessable: 1}; - } - - const handlerSuccess = await this.handleComplained(event); - - if (handlerSuccess) { - return { - complained: 1, - emailIds: [emailId] - }; - } - - return { - unprocessable: 1 - }; - } - - async _getEmailId(event) { - if (event.emailId) { - return event.emailId; - } - - return await this.getEmailId(event); - } - - async _getMemberId(event) { - if (event.memberId) { - return event.memberId; - } - - return await this.getMemberId(event); - } -}; diff --git a/ghost/email-analytics-service/test/email-analytics-service.test.js b/ghost/email-analytics-service/test/email-analytics-service.test.js index 47f886c20c..021ae6555a 100644 --- a/ghost/email-analytics-service/test/email-analytics-service.test.js +++ b/ghost/email-analytics-service/test/email-analytics-service.test.js @@ -5,22 +5,35 @@ require('./utils'); const sinon = require('sinon'); const { - EmailAnalyticsService, - EventProcessor + EmailAnalyticsService } = require('..'); const EventProcessingResult = require('../lib/event-processing-result'); describe('EmailAnalyticsService', function () { + let eventProcessor; + beforeEach(function () { + eventProcessor = {}; + eventProcessor.handleDelivered = sinon.stub().callsFake(({emailId}) => { + return { + emailId, + emailRecipientId: emailId, + memberId: 1 + }; + }); + eventProcessor.handleOpened = sinon.stub().callsFake(({emailId}) => { + return { + emailId, + emailRecipientId: emailId, + memberId: 1 + }; + }); + }); + describe('fetchAll', function () { - let eventProcessor; let providers; let queries; beforeEach(function () { - eventProcessor = new EventProcessor(); - eventProcessor.handleDelivered = sinon.fake.resolves(true); - eventProcessor.handleOpened = sinon.fake.resolves(true); - providers = { testing: { async fetchAll(batchHandler) { @@ -102,9 +115,6 @@ describe('EmailAnalyticsService', function () { describe('processEventBatch', function () { it('uses passed-in event processor', async function () { - const eventProcessor = new EventProcessor(); - eventProcessor.handleDelivered = sinon.stub().resolves(true); - const service = new EmailAnalyticsService({ eventProcessor }); @@ -124,8 +134,10 @@ describe('EmailAnalyticsService', function () { result.should.deepEqual(new EventProcessingResult({ delivered: 2, - unprocessable: 1, - emailIds: [1, 2] + opened: 1, + unprocessable: 0, + emailIds: [1, 2], + memberIds: [1] })); }); }); diff --git a/ghost/email-analytics-service/test/event-processor.test.js b/ghost/email-analytics-service/test/event-processor.test.js deleted file mode 100644 index b1004ce3fa..0000000000 --- a/ghost/email-analytics-service/test/event-processor.test.js +++ /dev/null @@ -1,431 +0,0 @@ -// Switch these lines once there are useful utils -// const testUtils = require('./utils'); -require('./utils'); - -const sinon = require('sinon'); - -const {EventProcessor} = require('..'); - -class CustomEventProcessor extends EventProcessor { - constructor() { - super(...arguments); - - this.getEmailId = sinon.fake.resolves('emailId'); - this.getMemberId = sinon.fake.resolves('memberId'); - - this.handleDelivered = sinon.fake.resolves(true); - this.handleOpened = sinon.fake.resolves(true); - this.handleTemporaryFailed = sinon.fake.resolves(true); - this.handlePermanentFailed = sinon.fake.resolves(true); - this.handleUnsubscribed = sinon.fake.resolves(true); - this.handleComplained = sinon.fake.resolves(true); - } -} - -describe('EventProcessor', function () { - let eventProcessor; - - beforeEach(function () { - eventProcessor = new CustomEventProcessor(); - }); - - afterEach(function () { - sinon.restore(); - }); - - describe('delivered', function () { - it('works', async function () { - const result = await eventProcessor.process({ - type: 'delivered' - }); - - eventProcessor.getEmailId.calledOnce.should.be.true(); - eventProcessor.getMemberId.calledOnce.should.be.true(); - eventProcessor.handleDelivered.calledOnce.should.be.true(); - - result.should.deepEqual({ - delivered: 1, - emailIds: ['emailId'], - memberIds: ['memberId'] - }); - }); - - it('gets emailId and memberId directly from event if available', async function () { - const result = await eventProcessor.process({ - type: 'delivered', - emailId: 'testEmailId', - memberId: 'testMemberId' - }); - - eventProcessor.getEmailId.called.should.be.false(); - eventProcessor.getMemberId.called.should.be.false(); - eventProcessor.handleDelivered.calledOnce.should.be.true(); - - result.should.deepEqual({ - delivered: 1, - emailIds: ['testEmailId'], - memberIds: ['testMemberId'] - }); - }); - - it('does not process if email id is not found', async function () { - sinon.replace(eventProcessor, 'getEmailId', sinon.fake.resolves(null)); - - const result = await eventProcessor.process({ - type: 'delivered' - }); - - eventProcessor.getEmailId.calledOnce.should.be.true(); - eventProcessor.getMemberId.calledOnce.should.be.false(); - eventProcessor.handleDelivered.calledOnce.should.be.false(); - - result.should.deepEqual({ - unprocessable: 1 - }); - }); - - it('does not process if handleDelivered is not overridden', async function () { - // test non-extended superclass instance - eventProcessor = new EventProcessor(); - - const result = await eventProcessor.process({ - type: 'delivered', - emailId: 'testEmailId', - memberId: 'testMemberId' - }); - - result.should.deepEqual({ - unprocessable: 1 - }); - }); - }); - - describe('opened', function () { - it('works', async function () { - const result = await eventProcessor.process({ - type: 'opened' - }); - - eventProcessor.getEmailId.calledOnce.should.be.true(); - eventProcessor.getMemberId.calledOnce.should.be.true(); - eventProcessor.handleOpened.calledOnce.should.be.true(); - - result.should.deepEqual({ - opened: 1, - emailIds: ['emailId'], - memberIds: ['memberId'] - }); - }); - - it('gets emailId and memberId directly from event if available', async function () { - const result = await eventProcessor.process({ - type: 'opened', - emailId: 'testEmailId', - memberId: 'testMemberId' - }); - - eventProcessor.getEmailId.called.should.be.false(); - eventProcessor.getMemberId.called.should.be.false(); - eventProcessor.handleOpened.calledOnce.should.be.true(); - - result.should.deepEqual({ - opened: 1, - emailIds: ['testEmailId'], - memberIds: ['testMemberId'] - }); - }); - - it('does not process if email id is not found', async function () { - sinon.replace(eventProcessor, 'getEmailId', sinon.fake.resolves(null)); - - const result = await eventProcessor.process({ - type: 'opened' - }); - - eventProcessor.getEmailId.calledOnce.should.be.true(); - eventProcessor.getMemberId.calledOnce.should.be.false(); - eventProcessor.handleOpened.calledOnce.should.be.false(); - - result.should.deepEqual({ - unprocessable: 1 - }); - }); - - it('does not process if handleOpened is not overridden', async function () { - // test non-extended superclass instance - eventProcessor = new EventProcessor(); - - const result = await eventProcessor.process({ - type: 'opened', - emailId: 'testEmailId', - memberId: 'testMemberId' - }); - - result.should.deepEqual({ - unprocessable: 1 - }); - }); - }); - - describe('failed - permanent', function () { - it('works', async function () { - const result = await eventProcessor.process({ - type: 'failed', - severity: 'permanent' - }); - - eventProcessor.getEmailId.calledOnce.should.be.true(); - eventProcessor.getMemberId.called.should.be.false(); - eventProcessor.handlePermanentFailed.calledOnce.should.be.true(); - - result.should.deepEqual({ - permanentFailed: 1, - emailIds: ['emailId'] - }); - }); - - it('gets emailId directly from event if available', async function () { - const result = await eventProcessor.process({ - type: 'failed', - severity: 'permanent', - emailId: 'testEmailId' - }); - - eventProcessor.getEmailId.called.should.be.false(); - eventProcessor.getMemberId.called.should.be.false(); - eventProcessor.handlePermanentFailed.calledOnce.should.be.true(); - - result.should.deepEqual({ - permanentFailed: 1, - emailIds: ['testEmailId'] - }); - }); - - it('does not process if email id is not found', async function () { - sinon.replace(eventProcessor, 'getEmailId', sinon.fake.resolves(null)); - - const result = await eventProcessor.process({ - type: 'failed', - severity: 'permanent' - }); - - eventProcessor.getEmailId.calledOnce.should.be.true(); - eventProcessor.getMemberId.called.should.be.false(); - eventProcessor.handlePermanentFailed.calledOnce.should.be.false(); - - result.should.deepEqual({ - unprocessable: 1 - }); - }); - - it('does not process if handlePermanentFailed is not overridden', async function () { - // test non-extended superclass instance - eventProcessor = new EventProcessor(); - - const result = await eventProcessor.process({ - type: 'opened', - severity: 'permanent', - emailId: 'testEmailId' - }); - - result.should.deepEqual({ - unprocessable: 1 - }); - }); - }); - - describe('failed - temporary', function () { - it('works', async function () { - const result = await eventProcessor.process({ - type: 'failed', - severity: 'temporary' - }); - - eventProcessor.getEmailId.calledOnce.should.be.true(); - eventProcessor.getMemberId.called.should.be.false(); - eventProcessor.handleTemporaryFailed.calledOnce.should.be.true(); - - result.should.deepEqual({ - temporaryFailed: 1, - emailIds: ['emailId'] - }); - }); - - it('gets emailId directly from event if available', async function () { - const result = await eventProcessor.process({ - type: 'failed', - severity: 'temporary', - emailId: 'testEmailId' - }); - - eventProcessor.getEmailId.called.should.be.false(); - eventProcessor.getMemberId.called.should.be.false(); - eventProcessor.handleTemporaryFailed.calledOnce.should.be.true(); - - result.should.deepEqual({ - temporaryFailed: 1, - emailIds: ['testEmailId'] - }); - }); - - it('does not process if email id is not found', async function () { - sinon.replace(eventProcessor, 'getEmailId', sinon.fake.resolves(null)); - - const result = await eventProcessor.process({ - type: 'failed', - severity: 'temporary' - }); - - eventProcessor.getEmailId.calledOnce.should.be.true(); - eventProcessor.getMemberId.called.should.be.false(); - eventProcessor.handleTemporaryFailed.calledOnce.should.be.false(); - - result.should.deepEqual({ - unprocessable: 1 - }); - }); - - it('does not process if handleTemporaryFailed is not overridden', async function () { - // test non-extended superclass instance - eventProcessor = new EventProcessor(); - - const result = await eventProcessor.process({ - type: 'opened', - severity: 'temporary', - emailId: 'testEmailId' - }); - - result.should.deepEqual({ - unprocessable: 1 - }); - }); - }); - - describe('unsubscribed', function () { - it('works', async function () { - const result = await eventProcessor.process({ - type: 'unsubscribed' - }); - - eventProcessor.getEmailId.calledOnce.should.be.true(); - eventProcessor.getMemberId.calledOnce.should.be.false(); - eventProcessor.handleUnsubscribed.calledOnce.should.be.true(); - - result.should.deepEqual({ - unsubscribed: 1, - emailIds: ['emailId'] - }); - }); - - it('gets emailId and memberId directly from event if available', async function () { - const result = await eventProcessor.process({ - type: 'unsubscribed', - emailId: 'testEmailId', - memberId: 'testMemberId' - }); - - eventProcessor.getEmailId.called.should.be.false(); - eventProcessor.getMemberId.called.should.be.false(); - eventProcessor.handleUnsubscribed.calledOnce.should.be.true(); - - result.should.deepEqual({ - unsubscribed: 1, - emailIds: ['testEmailId'] - }); - }); - - it('does not process if email id is not found', async function () { - sinon.replace(eventProcessor, 'getEmailId', sinon.fake.resolves(null)); - - const result = await eventProcessor.process({ - type: 'unsubscribed' - }); - - eventProcessor.getEmailId.calledOnce.should.be.true(); - eventProcessor.getMemberId.calledOnce.should.be.false(); - eventProcessor.handleUnsubscribed.calledOnce.should.be.false(); - - result.should.deepEqual({ - unprocessable: 1 - }); - }); - - it('does not process if handleUnsubscribed is not overridden', async function () { - // test non-extended superclass instance - eventProcessor = new EventProcessor(); - - const result = await eventProcessor.process({ - type: 'unsubscribed', - emailId: 'testEmailId' - }); - - result.should.deepEqual({ - unprocessable: 1 - }); - }); - }); - - describe('complained', function () { - it('works', async function () { - const result = await eventProcessor.process({ - type: 'complained' - }); - - eventProcessor.getEmailId.calledOnce.should.be.true(); - eventProcessor.getMemberId.calledOnce.should.be.false(); - eventProcessor.handleComplained.calledOnce.should.be.true(); - - result.should.deepEqual({ - complained: 1, - emailIds: ['emailId'] - }); - }); - - it('gets emailId and memberId directly from event if available', async function () { - const result = await eventProcessor.process({ - type: 'complained', - emailId: 'testEmailId', - memberId: 'testMemberId' - }); - - eventProcessor.getEmailId.called.should.be.false(); - eventProcessor.getMemberId.called.should.be.false(); - eventProcessor.handleComplained.calledOnce.should.be.true(); - - result.should.deepEqual({ - complained: 1, - emailIds: ['testEmailId'] - }); - }); - - it('does not process if email id is not found', async function () { - sinon.replace(eventProcessor, 'getEmailId', sinon.fake.resolves(null)); - - const result = await eventProcessor.process({ - type: 'complained' - }); - - eventProcessor.getEmailId.calledOnce.should.be.true(); - eventProcessor.getMemberId.calledOnce.should.be.false(); - eventProcessor.handleComplained.calledOnce.should.be.false(); - - result.should.deepEqual({ - unprocessable: 1 - }); - }); - - it('does not process if handleComplained is not overridden', async function () { - // test non-extended superclass instance - eventProcessor = new EventProcessor(); - - const result = await eventProcessor.process({ - type: 'complained', - emailId: 'testEmailId' - }); - - result.should.deepEqual({ - unprocessable: 1 - }); - }); - }); -}); diff --git a/ghost/email-events/index.js b/ghost/email-events/index.js index 7ddb7d4965..758b871f7c 100644 --- a/ghost/email-events/index.js +++ b/ghost/email-events/index.js @@ -1,4 +1,7 @@ module.exports = { SpamComplaintEvent: require('./lib/SpamComplaintEvent'), - EmailBouncedEvent: require('./lib/EmailBouncedEvent') + EmailBouncedEvent: require('./lib/EmailBouncedEvent'), + EmailDeliveredEvent: require('./lib/EmailDeliveredEvent'), + EmailOpenedEvent: require('./lib/EmailOpenedEvent'), + EmailUnsubscribedEvent: require('./lib/EmailUnsubscribedEvent') }; diff --git a/ghost/email-events/lib/EmailBouncedEvent.js b/ghost/email-events/lib/EmailBouncedEvent.js index d3bba8dbdd..e6616c54d9 100644 --- a/ghost/email-events/lib/EmailBouncedEvent.js +++ b/ghost/email-events/lib/EmailBouncedEvent.js @@ -1,7 +1,3 @@ -/** - * @typedef {import('bson-objectid').default} ObjectID - */ - module.exports = class EmailBouncedEvent { /** * @readonly @@ -11,16 +7,22 @@ module.exports = class EmailBouncedEvent { /** * @readonly - * @type {ObjectID} + * @type {string} */ memberId; /** * @readonly - * @type {ObjectID} + * @type {string} */ emailId; + /** + * @readonly + * @type {string} + */ + emailRecipientId; + /** * @readonly * @type {Date} @@ -30,10 +32,11 @@ module.exports = class EmailBouncedEvent { /** * @private */ - constructor({email, memberId, emailId, timestamp}) { - this.email = email; + constructor({email, memberId, emailId, emailRecipientId, timestamp}) { this.memberId = memberId; this.emailId = emailId; + this.email = email; + this.emailRecipientId = emailRecipientId; this.timestamp = timestamp; } diff --git a/ghost/email-events/lib/EmailDeliveredEvent.js b/ghost/email-events/lib/EmailDeliveredEvent.js new file mode 100644 index 0000000000..4c07c3da89 --- /dev/null +++ b/ghost/email-events/lib/EmailDeliveredEvent.js @@ -0,0 +1,49 @@ +module.exports = class EmailDeliveredEvent { + /** + * @readonly + * @type {string} + */ + email; + + /** + * @readonly + * @type {string} + */ + memberId; + + /** + * @readonly + * @type {string} + */ + emailId; + + /** + * @readonly + * @type {string} + */ + emailRecipientId; + + /** + * @readonly + * @type {Date} + */ + timestamp; + + /** + * @private + */ + constructor({email, memberId, emailId, emailRecipientId, timestamp}) { + this.email = email; + this.memberId = memberId; + this.emailId = emailId; + this.emailRecipientId = emailRecipientId; + this.timestamp = timestamp; + } + + static create(data) { + return new EmailDeliveredEvent({ + ...data, + timestamp: data.timestamp || new Date + }); + } +}; diff --git a/ghost/email-events/lib/EmailOpenedEvent.js b/ghost/email-events/lib/EmailOpenedEvent.js new file mode 100644 index 0000000000..2e531bd02a --- /dev/null +++ b/ghost/email-events/lib/EmailOpenedEvent.js @@ -0,0 +1,49 @@ +module.exports = class EmailOpenedEvent { + /** + * @readonly + * @type {string} + */ + email; + + /** + * @readonly + * @type {string} + */ + memberId; + + /** + * @readonly + * @type {string} + */ + emailId; + + /** + * @readonly + * @type {string} + */ + emailRecipientId; + + /** + * @readonly + * @type {Date} + */ + timestamp; + + /** + * @private + */ + constructor({email, memberId, emailId, emailRecipientId, timestamp}) { + this.memberId = memberId; + this.emailId = emailId; + this.emailRecipientId = emailRecipientId; + this.email = email; + this.timestamp = timestamp; + } + + static create(data) { + return new EmailOpenedEvent({ + ...data, + timestamp: data.timestamp || new Date + }); + } +}; diff --git a/ghost/email-events/lib/EmailUnsubscribedEvent.js b/ghost/email-events/lib/EmailUnsubscribedEvent.js new file mode 100644 index 0000000000..d024d9132b --- /dev/null +++ b/ghost/email-events/lib/EmailUnsubscribedEvent.js @@ -0,0 +1,42 @@ +module.exports = class EmailUnsubscribedEvent { + /** + * @readonly + * @type {string} + */ + email; + + /** + * @readonly + * @type {string} + */ + memberId; + + /** + * @readonly + * @type {string} + */ + emailId; + + /** + * @readonly + * @type {Date} + */ + timestamp; + + /** + * @private + */ + constructor({email, memberId, emailId, timestamp}) { + this.memberId = memberId; + this.emailId = emailId; + this.email = email; + this.timestamp = timestamp; + } + + static create(data) { + return new EmailUnsubscribedEvent({ + ...data, + timestamp: data.timestamp || new Date + }); + } +}; diff --git a/ghost/email-events/lib/SpamComplaintEvent.js b/ghost/email-events/lib/SpamComplaintEvent.js index f16f647fa3..ee195da6d2 100644 --- a/ghost/email-events/lib/SpamComplaintEvent.js +++ b/ghost/email-events/lib/SpamComplaintEvent.js @@ -1,7 +1,3 @@ -/** - * @typedef {import('bson-objectid').default} ObjectID - */ - module.exports = class SpamComplaintEvent { /** * @readonly @@ -11,13 +7,13 @@ module.exports = class SpamComplaintEvent { /** * @readonly - * @type {ObjectID} + * @type {string} */ memberId; /** * @readonly - * @type {ObjectID} + * @type {string} */ emailId; @@ -31,9 +27,9 @@ module.exports = class SpamComplaintEvent { * @private */ constructor({email, memberId, emailId, timestamp}) { - this.email = email; this.memberId = memberId; this.emailId = emailId; + this.email = email; this.timestamp = timestamp; } diff --git a/ghost/email-events/test/lib/EmailBouncedEvent.test.js b/ghost/email-events/test/lib/EmailBouncedEvent.test.js index a93c875c36..e4681bc098 100644 --- a/ghost/email-events/test/lib/EmailBouncedEvent.test.js +++ b/ghost/email-events/test/lib/EmailBouncedEvent.test.js @@ -6,8 +6,9 @@ describe('EmailBouncedEvent', function () { it('exports a static create method to create instances', function () { const event = EmailBouncedEvent.create({ email: 'test@test.test', - memberId: new ObjectID(), - emailId: new ObjectID(), + memberId: new ObjectID().toHexString(), + emailId: new ObjectID().toHexString(), + emailRecipientId: new ObjectID().toHexString(), timestamp: new Date() }); assert(event instanceof EmailBouncedEvent); diff --git a/ghost/email-events/test/lib/EmailDeliveredEvent.test.js b/ghost/email-events/test/lib/EmailDeliveredEvent.test.js new file mode 100644 index 0000000000..8eb887b9a8 --- /dev/null +++ b/ghost/email-events/test/lib/EmailDeliveredEvent.test.js @@ -0,0 +1,16 @@ +const assert = require('assert'); +const ObjectID = require('bson-objectid').default; +const EmailDeliveredEvent = require('../../lib/EmailDeliveredEvent'); + +describe('EmailDeliveredEvent', function () { + it('exports a static create method to create instances', function () { + const event = EmailDeliveredEvent.create({ + email: 'test@test.test', + memberId: new ObjectID().toHexString(), + emailId: new ObjectID().toHexString(), + emailRecipientId: new ObjectID().toHexString(), + timestamp: new Date() + }); + assert(event instanceof EmailDeliveredEvent); + }); +}); diff --git a/ghost/email-events/test/lib/EmailOpenedEvent.test.js b/ghost/email-events/test/lib/EmailOpenedEvent.test.js new file mode 100644 index 0000000000..b22df59f80 --- /dev/null +++ b/ghost/email-events/test/lib/EmailOpenedEvent.test.js @@ -0,0 +1,16 @@ +const assert = require('assert'); +const ObjectID = require('bson-objectid').default; +const EmailOpenedEvent = require('../../lib/EmailOpenedEvent'); + +describe('EmailOpenedEvent', function () { + it('exports a static create method to create instances', function () { + const event = EmailOpenedEvent.create({ + email: 'test@test.test', + memberId: new ObjectID().toHexString(), + emailId: new ObjectID().toHexString(), + emailRecipientId: new ObjectID().toHexString(), + timestamp: new Date() + }); + assert(event instanceof EmailOpenedEvent); + }); +}); diff --git a/ghost/email-events/test/lib/EmailUnsubscribedEvent.test.js b/ghost/email-events/test/lib/EmailUnsubscribedEvent.test.js new file mode 100644 index 0000000000..e88e896477 --- /dev/null +++ b/ghost/email-events/test/lib/EmailUnsubscribedEvent.test.js @@ -0,0 +1,15 @@ +const assert = require('assert'); +const ObjectID = require('bson-objectid').default; +const EmailUnsubscribedEvent = require('../../lib/EmailUnsubscribedEvent'); + +describe('EmailUnsubscribedEvent', function () { + it('exports a static create method to create instances', function () { + const event = EmailUnsubscribedEvent.create({ + email: 'test@test.test', + memberId: new ObjectID().toHexString(), + emailId: new ObjectID().toHexString(), + timestamp: new Date() + }); + assert(event instanceof EmailUnsubscribedEvent); + }); +}); diff --git a/ghost/email-events/test/lib/SpamComplaintEvent.test.js b/ghost/email-events/test/lib/SpamComplaintEvent.test.js index 252131ff33..c67bead807 100644 --- a/ghost/email-events/test/lib/SpamComplaintEvent.test.js +++ b/ghost/email-events/test/lib/SpamComplaintEvent.test.js @@ -6,8 +6,8 @@ describe('SpamComplaintEvent', function () { it('exports a static create method to create instances', function () { const event = SpamComplaintEvent.create({ email: 'test@test.test', - memberId: new ObjectID(), - emailId: new ObjectID(), + memberId: new ObjectID().toHexString(), + emailId: new ObjectID().toHexString(), timestamp: new Date() }); assert(event instanceof SpamComplaintEvent); diff --git a/ghost/email-service/index.js b/ghost/email-service/index.js index 92f606a1ff..31ff58c747 100644 --- a/ghost/email-service/index.js +++ b/ghost/email-service/index.js @@ -4,5 +4,7 @@ module.exports = { EmailRenderer: require('./lib/email-renderer'), EmailSegmenter: require('./lib/email-segmenter'), SendingService: require('./lib/sending-service'), - BatchSendingService: require('./lib/batch-sending-service') + BatchSendingService: require('./lib/batch-sending-service'), + EmailEventProcessor: require('./lib/email-event-processor'), + EmailEventStorage: require('./lib/email-event-storage') }; diff --git a/ghost/email-service/lib/email-event-processor.js b/ghost/email-service/lib/email-event-processor.js new file mode 100644 index 0000000000..5fb61a5063 --- /dev/null +++ b/ghost/email-service/lib/email-event-processor.js @@ -0,0 +1,182 @@ +const {EmailDeliveredEvent, EmailOpenedEvent, EmailBouncedEvent, SpamComplaintEvent, EmailUnsubscribedEvent} = require('@tryghost/email-events'); + +/** + * @typedef EmailIdentification + * @property {string} email + * @property {string} providerId + * @property {string} [emailId] Optional email id + */ + +/** + * @typedef EmailRecipientInformation + * @property {string} emailRecipientId + * @property {string} memberId + * @property {string} emailId + */ + +/** + * WARNING: this class is used in a separate thread (an offloaded job). Be careful when working with settings and models. + */ +class EmailEventProcessor { + #domainEvents; + #db; + + constructor({domainEvents, db}) { + this.#domainEvents = domainEvents; + this.#db = db; + + // Avoid having to query email_batch by provider_id for every event + this.providerIdEmailIdMap = {}; + } + + /** + * @param {EmailIdentification} emailIdentification + * @param {Date} timestamp + */ + async handleDelivered(emailIdentification, timestamp) { + const recipient = await this.getRecipient(emailIdentification); + if (recipient) { + this.#domainEvents.dispatch(EmailDeliveredEvent.create({ + email: emailIdentification.email, + emailRecipientId: recipient.emailRecipientId, + memberId: recipient.memberId, + emailId: recipient.emailId, + timestamp + })); + } + return recipient; + } + + /** + * @param {EmailIdentification} emailIdentification + * @param {Date} timestamp + */ + async handleOpened(emailIdentification, timestamp) { + const recipient = await this.getRecipient(emailIdentification); + if (recipient) { + this.#domainEvents.dispatch(EmailOpenedEvent.create({ + email: emailIdentification.email, + emailRecipientId: recipient.emailRecipientId, + memberId: recipient.memberId, + emailId: recipient.emailId, + timestamp + })); + } + return recipient; + } + + /** + * @param {EmailIdentification} emailIdentification + */ + async handleTemporaryFailed(emailIdentification) { + const recipient = await this.getRecipient(emailIdentification); + // TODO: store and emit event + return recipient; + } + + /** + * @param {EmailIdentification} emailIdentification + * @param {Date} timestamp + */ + async handlePermanentFailed(emailIdentification, timestamp) { + // TODO: also read error message + const recipient = await this.getRecipient(emailIdentification); + if (recipient) { + this.#domainEvents.dispatch(EmailBouncedEvent.create({ + email: emailIdentification.email, + memberId: recipient.memberId, + emailId: recipient.emailId, + emailRecipientId: recipient.emailRecipientId, + timestamp + })); + } + return recipient; + } + + /** + * @param {EmailIdentification} emailIdentification + * @param {Date} timestamp + */ + async handleUnsubscribed(emailIdentification, timestamp) { + const recipient = await this.getRecipient(emailIdentification); + if (recipient) { + this.#domainEvents.dispatch(EmailUnsubscribedEvent.create({ + email: emailIdentification.email, + memberId: recipient.memberId, + emailId: recipient.emailId, + timestamp + })); + } + return recipient; + } + + /** + * @param {EmailIdentification} emailIdentification + * @param {Date} timestamp + */ + async handleComplained(emailIdentification, timestamp) { + const recipient = await this.getRecipient(emailIdentification); + if (recipient) { + this.#domainEvents.dispatch(SpamComplaintEvent.create({ + email: emailIdentification.email, + memberId: recipient.memberId, + emailId: recipient.emailId, + timestamp + })); + } + return recipient; + } + + /** + * @private + * @param {EmailIdentification} emailIdentification + * @returns {Promise} + */ + async getRecipient(emailIdentification) { + // With the provider_id and email address we can look for the EmailRecipient + const emailId = emailIdentification.emailId ?? await this.getEmailId(emailIdentification.providerId); + if (!emailId) { + // Invalid + return; + } + + const {id: emailRecipientId, member_id: memberId} = await this.#db.knex('email_recipients') + .select('id', 'member_id') + .where('member_email', emailIdentification.email) + .where('email_id', emailId) + .first() || {}; + + if (emailRecipientId && memberId) { + return { + emailRecipientId, + memberId, + emailId + }; + } + } + + /** + * @private + * @param {string} providerId + * @returns {Promise} + */ + async getEmailId(providerId) { + if (this.providerIdEmailIdMap[providerId]) { + return this.providerIdEmailIdMap[providerId]; + } + + const {emailId} = await this.#db.knex('email_batches') + .select('email_id as emailId') + .where('provider_id', providerId) + .first() || {}; + + if (!emailId) { + return; + } + + this.providerIdEmailIdMap[providerId] = emailId; + return emailId; + } +} + +module.exports = EmailEventProcessor; diff --git a/ghost/email-service/lib/email-event-storage.js b/ghost/email-service/lib/email-event-storage.js new file mode 100644 index 0000000000..43da6d6639 --- /dev/null +++ b/ghost/email-service/lib/email-event-storage.js @@ -0,0 +1,72 @@ +const {EmailDeliveredEvent, EmailOpenedEvent, EmailBouncedEvent, EmailUnsubscribedEvent, SpamComplaintEvent} = require('@tryghost/email-events'); +const moment = require('moment-timezone'); + +class EmailEventStorage { + #db; + #membersRepository; + + constructor({db, membersRepository}) { + this.#db = db; + this.#membersRepository = membersRepository; + } + + listen(domainEvents) { + domainEvents.subscribe(EmailDeliveredEvent, async (event) => { + await this.handleDelivered(event); + }); + + domainEvents.subscribe(EmailOpenedEvent, async (event) => { + await this.handleOpened(event); + }); + + domainEvents.subscribe(EmailBouncedEvent, async (event) => { + await this.handlePermanentFailed(event); + }); + + domainEvents.subscribe(EmailUnsubscribedEvent, async (event) => { + await this.handleUnsubscribed(event); + }); + + domainEvents.subscribe(SpamComplaintEvent, async (event) => { + await this.handleComplained(event); + }); + } + + async handleDelivered(event) { + await this.#db.knex('email_recipients') + .where('id', '=', event.emailRecipientId) + .update({ + delivered_at: this.#db.knex.raw('COALESCE(delivered_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')]) + }); + } + + async handleOpened(event) { + await this.#db.knex('email_recipients') + .where('id', '=', event.emailRecipientId) + .update({ + opened_at: this.#db.knex.raw('COALESCE(opened_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')]) + }); + } + + async handlePermanentFailed(event) { + await this.#db.knex('email_recipients') + .where('id', '=', event.emailRecipientId) + .update({ + failed_at: this.#db.knex.raw('COALESCE(failed_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')]) + }); + } + + async handleUnsubscribed(event) { + return this.unsubscribeFromNewsletters(event); + } + + async handleComplained(event) { + return this.unsubscribeFromNewsletters(event); + } + + async unsubscribeFromNewsletters(event) { + await this.#membersRepository.update({newsletters: []}, {id: event.memberId}); + } +} + +module.exports = EmailEventStorage; diff --git a/ghost/email-service/package.json b/ghost/email-service/package.json index 332ea6d025..39220de558 100644 --- a/ghost/email-service/package.json +++ b/ghost/email-service/package.json @@ -26,6 +26,8 @@ "dependencies": { "@tryghost/errors": "1.2.18", "@tryghost/tpl": "0.1.19", - "bson-objectid": "2.0.4" + "bson-objectid": "2.0.4", + "@tryghost/email-events": "0.0.0", + "moment-timezone": "0.5.23" } } diff --git a/ghost/job-manager/lib/job-manager.js b/ghost/job-manager/lib/job-manager.js index 2b1be5f511..a2878386c8 100644 --- a/ghost/job-manager/lib/job-manager.js +++ b/ghost/job-manager/lib/job-manager.js @@ -28,16 +28,20 @@ const ALL_STATUSES = { }; class JobManager { + #domainEvents; + /** * @param {Object} options * @param {Function} [options.errorHandler] - custom job error handler * @param {Function} [options.workerMessageHandler] - custom message handler coming from workers * @param {Object} [options.JobModel] - a model which can persist job data in the storage + * @param {Object} [options.domainEvents] - domain events emitter */ - constructor({errorHandler, workerMessageHandler, JobModel}) { + constructor({errorHandler, workerMessageHandler, JobModel, domainEvents}) { this.queue = fastq(this, worker, 1); this._jobMessageHandler = this._jobMessageHandler.bind(this); this._jobErrorHandler = this._jobErrorHandler.bind(this); + this.#domainEvents = domainEvents; const combinedMessageHandler = workerMessageHandler ? ({name, message}) => { @@ -109,6 +113,13 @@ class JobManager { finished_at: new Date() }); } + } else { + if (typeof message === 'object' && this.#domainEvents) { + // Is this an event? + if (message.event) { + this.#domainEvents.dispatchRaw(message.event.type, message.event.data); + } + } } } } diff --git a/ghost/members-events-service/lib/last-seen-at-updater.js b/ghost/members-events-service/lib/last-seen-at-updater.js index 03094e4dfd..ee92b03aaa 100644 --- a/ghost/members-events-service/lib/last-seen-at-updater.js +++ b/ghost/members-events-service/lib/last-seen-at-updater.js @@ -1,6 +1,7 @@ const {MemberPageViewEvent, MemberCommentEvent, MemberLinkClickEvent} = require('@tryghost/member-events'); const moment = require('moment-timezone'); const {IncorrectUsageError} = require('@tryghost/errors'); +const {EmailOpenedEvent} = require('@tryghost/email-events'); /** * Listen for `MemberViewEvent` to update the `member.last_seen_at` timestamp @@ -42,6 +43,10 @@ class LastSeenAtUpdater { domainEvents.subscribe(MemberCommentEvent, async (event) => { await this.updateLastCommentedAt(event.data.memberId, event.timestamp); }); + + domainEvents.subscribe(EmailOpenedEvent, async (event) => { + await this.updateLastSeenAtWithoutKnownLastSeen(event.memberId, event.timestamp); + }); } /** @@ -50,13 +55,29 @@ class LastSeenAtUpdater { * - memberLastSeenAt is 2022-02-27 23:00:00, timestamp is current time, then `last_seen_at` is set to the current time * - memberLastSeenAt is 2022-02-28 01:00:00, timestamp is current time, then `last_seen_at` isn't changed * @param {string} memberId The id of the member to be udpated - * @param {string} memberLastSeenAt The previous last_seen_at property value for the current member + * @param {Date} timestamp The event timestamp + */ + async updateLastSeenAtWithoutKnownLastSeen(memberId, timestamp) { + // Fetch manually + const membersApi = this._getMembersApi(); + const member = await membersApi.members.get({id: memberId}, {require: true}); + const memberLastSeenAt = member.get('last_seen_at'); + await this.updateLastSeenAt(memberId, memberLastSeenAt, timestamp); + } + + /** + * Updates the member.last_seen_at field if it wasn't updated in the current day yet (in the publication timezone) + * Example: current time is 2022-02-28 18:00:00 + * - memberLastSeenAt is 2022-02-27 23:00:00, timestamp is current time, then `last_seen_at` is set to the current time + * - memberLastSeenAt is 2022-02-28 01:00:00, timestamp is current time, then `last_seen_at` isn't changed + * @param {string} memberId The id of the member to be udpated + * @param {string|null} memberLastSeenAt The previous last_seen_at property value for the current member * @param {Date} timestamp The event timestamp */ async updateLastSeenAt(memberId, memberLastSeenAt, timestamp) { const timezone = this._settingsCacheService.get('timezone'); if (memberLastSeenAt === null || moment(moment.utc(timestamp).tz(timezone).startOf('day')).isAfter(memberLastSeenAt)) { - const membersApi = await this._getMembersApi(); + const membersApi = this._getMembersApi(); await membersApi.members.update({ last_seen_at: moment.utc(timestamp).format('YYYY-MM-DD HH:mm:ss') }, { @@ -74,7 +95,7 @@ class LastSeenAtUpdater { * @param {Date} timestamp The event timestamp */ async updateLastCommentedAt(memberId, timestamp) { - const membersApi = await this._getMembersApi(); + const membersApi = this._getMembersApi(); const member = await membersApi.members.get({id: memberId}, {require: true}); const timezone = this._settingsCacheService.get('timezone'); diff --git a/ghost/members-events-service/test/last-seen-at-updater.test.js b/ghost/members-events-service/test/last-seen-at-updater.test.js index a3cd44145c..e80185da19 100644 --- a/ghost/members-events-service/test/last-seen-at-updater.test.js +++ b/ghost/members-events-service/test/last-seen-at-updater.test.js @@ -8,6 +8,13 @@ const {LastSeenAtUpdater} = require('../'); const DomainEvents = require('@tryghost/domain-events'); const {MemberPageViewEvent, MemberCommentEvent} = require('@tryghost/member-events'); const moment = require('moment'); +const {EmailOpenedEvent} = require('@tryghost/email-events'); + +async function sleep(ms) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} describe('LastSeenAtUpdater', function () { it('Calls updateLastSeenAt on MemberPageViewEvents', async function () { @@ -21,7 +28,7 @@ describe('LastSeenAtUpdater', function () { get: settingsCache } }, - async getMembersApi() { + getMembersApi() { return { members: { update: stub @@ -35,6 +42,43 @@ describe('LastSeenAtUpdater', function () { assert(updater.updateLastSeenAt.calledOnceWithExactly('1', previousLastSeen, now.toDate())); }); + it('Calls updateLastSeenAt on email opened events', async function () { + const now = moment('2022-02-28T18:00:00Z').utc(); + const previousLastSeen = moment('2022-02-27T23:00:00Z').toISOString(); + const stub = sinon.stub().resolves(); + const getStub = sinon.stub().resolves({ + get() { + return previousLastSeen; + } + }); + const settingsCache = sinon.stub().returns('Etc/UTC'); + const updater = new LastSeenAtUpdater({ + services: { + settingsCache: { + get: settingsCache + } + }, + getMembersApi() { + return { + members: { + update: stub, + get: getStub + } + }; + } + }); + updater.subscribe(DomainEvents); + sinon.spy(updater, 'updateLastSeenAt'); + sinon.spy(updater, 'updateLastSeenAtWithoutKnownLastSeen'); + DomainEvents.dispatch(EmailOpenedEvent.create({memberId: '1', emailRecipientId: '1', emailId: '1', timestamp: now.toDate()})); + // Wait for next tick + await sleep(50); + assert(updater.updateLastSeenAt.calledOnceWithExactly('1', previousLastSeen, now.toDate())); + assert(updater.updateLastSeenAtWithoutKnownLastSeen.calledOnceWithExactly('1', now.toDate())); + assert(getStub.calledOnce); + assert(stub.calledOnce); + }); + it('Calls updateLastCommentedAt on MemberCommentEvents', async function () { const now = moment('2022-02-28T18:00:00Z').utc(); const stub = sinon.stub().resolves(); @@ -45,7 +89,7 @@ describe('LastSeenAtUpdater', function () { get: settingsCache } }, - async getMembersApi() { + getMembersApi() { return { members: { update: stub @@ -70,7 +114,7 @@ describe('LastSeenAtUpdater', function () { get: settingsCache } }, - async getMembersApi() { + getMembersApi() { return { members: { update: stub @@ -93,7 +137,7 @@ describe('LastSeenAtUpdater', function () { get: settingsCache } }, - async getMembersApi() { + getMembersApi() { return { members: { update: stub, @@ -124,7 +168,7 @@ describe('LastSeenAtUpdater', function () { get: settingsCache } }, - async getMembersApi() { + getMembersApi() { return { members: { update: stub @@ -151,7 +195,7 @@ describe('LastSeenAtUpdater', function () { get: settingsCache } }, - async getMembersApi() { + getMembersApi() { return { members: { update: stub @@ -174,7 +218,7 @@ describe('LastSeenAtUpdater', function () { get: settingsCache } }, - async getMembersApi() { + getMembersApi() { return { members: { update: stub, @@ -205,7 +249,7 @@ describe('LastSeenAtUpdater', function () { get: settingsCache } }, - async getMembersApi() { + getMembersApi() { return { members: { update: stub, @@ -236,7 +280,7 @@ describe('LastSeenAtUpdater', function () { get: settingsCache } }, - async getMembersApi() { + getMembersApi() { return { members: { update: stub, @@ -274,7 +318,7 @@ describe('LastSeenAtUpdater', function () { get: settingsCache } }, - async getMembersApi() { + getMembersApi() { return { members: { update: stub, @@ -311,7 +355,7 @@ describe('LastSeenAtUpdater', function () { get: settingsCache } }, - async getMembersApi() { + getMembersApi() { return { members: { update: stub