Added new email event processor (#15879)

fixes https://github.com/TryGhost/Team/issues/2310

This moves the processing of the events from the event-processor to a
new email-event-processor in the email-service package.

- The `EmailEventProcessor` only translates events from
providerId/emailId to their known emailId, memberId and recipientId, and
dispatches the corresponding events.
- Since `EmailEventProcessor` runs in a separate worker thread, we can't
listen for the dispatched events on the main thread. To accomplish this
communication, the events dispatched from the `EmailEventProcessor`
class are 'posted' via the postMessage method and redispatched on the
main thread.
- A new `EmailEventStorage` class reacts to the email events and stores
it in the database. This code mostly corresponds to the (now deleted)
subclass of the old `EmailEventProcessor`
- Updating a members last_seen_at timestamp has moved to the
lastSeenAtUpdater.
- Email events no longer store `ObjectID` because these are not
encodable across threads via postMessage
- Includes new E2E tests that test the storage of all supported Mailgun
events. Note that in these tests we run the processing on the main
thread instead of on a separate thread (couldn't do this because
stubbing is not possible across threads)

There are some missing pieces that will get added in later PRs (this PR
focuses on porting the existing functionality):
- Handling temporary failures/bounces
- Capturing the error messages of bounce events
This commit is contained in:
Simon Backx 2022-11-29 11:15:19 +01:00 committed by GitHub
parent a95cc4e7ac
commit f4fdb4fa6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 1252 additions and 921 deletions

View File

@ -2,14 +2,20 @@ const config = require('../../../shared/config');
const db = require('../../data/db');
const settings = require('../../../shared/settings-cache');
const {EmailAnalyticsService} = require('@tryghost/email-analytics-service');
const EventProcessor = require('./lib/event-processor');
const {EmailEventProcessor} = require('@tryghost/email-service');
const MailgunProvider = require('@tryghost/email-analytics-provider-mailgun');
const queries = require('./lib/queries');
const DomainEvents = require('@tryghost/domain-events');
const eventProcessor = new EmailEventProcessor({
domainEvents: DomainEvents,
db
});
module.exports = new EmailAnalyticsService({
config,
settings,
eventProcessor: new EventProcessor({db}),
eventProcessor,
providers: [
new MailgunProvider({config, settings})
],

View File

@ -0,0 +1,50 @@
const {parentPort} = require('worker_threads');
// recurring job to fetch analytics since the most recently seen event timestamp
// Exit early when cancelled to prevent stalling shutdown. No cleanup needed when cancelling as everything is idempotent and will pick up
// where it left off on next run
function cancel() {
if (parentPort) {
parentPort.postMessage('Email analytics fetch-latest job cancelled before completion');
parentPort.postMessage('cancelled');
} else {
setTimeout(() => {
process.exit(0);
}, 1000);
}
}
if (parentPort) {
parentPort.once('message', (message) => {
if (message === 'cancel') {
return cancel();
}
});
}
(async () => {
const {run} = require('./run');
const {eventStats, aggregateEndDate, fetchStartDate} = await run({
domainEvents: {
dispatch(event) {
parentPort.postMessage({
event: {
type: event.constructor.name,
data: event
}
});
}
}
});
if (parentPort) {
parentPort.postMessage(`Fetched ${eventStats.totalEvents} events and aggregated stats for ${eventStats.emailIds.length} emails in ${aggregateEndDate - fetchStartDate}ms`);
parentPort.postMessage('done');
} else {
// give the logging pipes time finish writing before exit
setTimeout(() => {
process.exit(0);
}, 1000);
}
})();

View File

@ -1,32 +1,8 @@
const {parentPort} = require('worker_threads');
const debug = require('@tryghost/debug')('jobs:email-analytics:fetch-latest');
// recurring job to fetch analytics since the most recently seen event timestamp
// Exit early when cancelled to prevent stalling shutdown. No cleanup needed when cancelling as everything is idempotent and will pick up
// where it left off on next run
function cancel() {
if (parentPort) {
parentPort.postMessage('Email analytics fetch-latest job cancelled before completion');
parentPort.postMessage('cancelled');
} else {
setTimeout(() => {
process.exit(0);
}, 1000);
}
}
if (parentPort) {
parentPort.once('message', (message) => {
if (message === 'cancel') {
return cancel();
}
});
}
(async () => {
const config = require('../../../../shared/config');
const db = require('../../../data/db');
async function run({domainEvents}) {
const config = require('../../../../../shared/config');
const db = require('../../../../data/db');
const settingsRows = await db.knex('settings')
.whereIn('key', ['mailgun_api_key', 'mailgun_domain', 'mailgun_base_url']);
@ -44,14 +20,21 @@ if (parentPort) {
};
const {EmailAnalyticsService} = require('@tryghost/email-analytics-service');
const EventProcessor = require('../lib/event-processor');
const MailgunProvider = require('@tryghost/email-analytics-provider-mailgun');
const queries = require('../lib/queries');
const queries = require('../../lib/queries');
const {EmailEventProcessor} = require('@tryghost/email-service');
// Since this is running in a worker thread, we cant dispatch directly
// So we post the events as a message to the job manager
const eventProcessor = new EmailEventProcessor({
domainEvents,
db
});
const emailAnalyticsService = new EmailAnalyticsService({
config,
settings,
eventProcessor: new EventProcessor({db}),
eventProcessor,
providers: [
new MailgunProvider({config, settings})
],
@ -69,14 +52,6 @@ if (parentPort) {
await emailAnalyticsService.aggregateStats(eventStats);
const aggregateEndDate = new Date();
debug(`Finished aggregating email analytics in ${aggregateEndDate - aggregateStartDate}ms`);
if (parentPort) {
parentPort.postMessage(`Fetched ${eventStats.totalEvents} events and aggregated stats for ${eventStats.emailIds.length} emails in ${aggregateEndDate - fetchStartDate}ms`);
parentPort.postMessage('done');
} else {
// give the logging pipes time finish writing before exit
setTimeout(() => {
process.exit(0);
}, 1000);
}
})();
return {eventStats, fetchStartDate, fetchEndDate, aggregateStartDate, aggregateEndDate};
}
module.exports.run = run;

View File

@ -30,7 +30,7 @@ module.exports = {
jobsService.addJob({
at: `${s} ${m}/5 * * * *`,
job: path.resolve(__dirname, 'fetch-latest.js'),
job: path.resolve(__dirname, 'fetch-latest/index.js'),
name: 'email-analytics-fetch-latest'
});

View File

@ -1,178 +0,0 @@
const {EventProcessor} = require('@tryghost/email-analytics-service');
const {default: ObjectID} = require('bson-objectid');
const moment = require('moment-timezone');
class GhostEventProcessor extends EventProcessor {
constructor({db}) {
super(...arguments);
this.db = db;
// avoid having to query email_batch by provider_id for every event
this.providerIdEmailIdMap = {};
}
async getEmailId(event) {
if (event.emailId) {
return event.emailId;
}
if (event.providerId) {
if (this.providerIdEmailIdMap[event.providerId]) {
return this.providerIdEmailIdMap[event.providerId];
}
const {emailId} = await this.db.knex('email_batches')
.select('email_id as emailId')
.where('provider_id', event.providerId)
.first() || {};
if (!emailId) {
return;
}
this.providerIdEmailIdMap[event.providerId] = emailId;
return emailId;
}
return undefined;
}
async getMemberId(event) {
const emailId = await this.getEmailId(event);
if (!emailId) {
return false;
}
if (emailId && event.recipientEmail) {
const {memberId} = await this.db.knex('email_recipients')
.select('member_id as memberId')
.where('member_email', event.recipientEmail)
.where('email_id', emailId)
.first() || {};
return memberId;
}
return undefined;
}
async handleDelivered(event) {
const emailId = await this.getEmailId(event);
if (!emailId) {
return false;
}
const updateResult = await this.db.knex('email_recipients')
.where('email_id', '=', emailId)
.where('member_email', '=', event.recipientEmail)
.update({
delivered_at: this.db.knex.raw('COALESCE(delivered_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')])
});
return updateResult > 0;
}
async handleOpened(event) {
const emailId = await this.getEmailId(event);
if (!emailId) {
return false;
}
const updateResult = await this.db.knex('email_recipients')
.where('email_id', '=', emailId)
.where('member_email', '=', event.recipientEmail)
.update({
opened_at: this.db.knex.raw('COALESCE(opened_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')])
});
// Using the default timezone set in https://github.com/TryGhost/Ghost/blob/2c5643623db0fc4db390f6997c81a73dca7ccacd/core/server/data/schema/default-settings/default-settings.json#L105
let timezone = 'Etc/UTC';
const timezoneData = await this.db.knex('settings').first('value').where('key', 'timezone');
if (timezoneData && timezoneData.value) {
timezone = timezoneData.value;
}
await this.db.knex('members')
.where('email', '=', event.recipientEmail)
.andWhere(builder => builder
.where('last_seen_at', '<', moment.utc(event.timestamp).tz(timezone).startOf('day').utc().format('YYYY-MM-DD HH:mm:ss'))
.orWhereNull('last_seen_at')
)
.update({
last_seen_at: moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')
});
return updateResult > 0;
}
async handleTemporaryFailed(/*event*/) {
// noop - we don't do anything with temporary failures for now
}
async handlePermanentFailed(event) {
const emailId = await this.getEmailId(event);
if (!emailId) {
return false;
}
const updateResult = await this.db.knex('email_recipients')
.where('email_id', '=', emailId)
.where('member_email', '=', event.recipientEmail)
.update({
failed_at: this.db.knex.raw('COALESCE(failed_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')])
});
return updateResult > 0;
}
async handleUnsubscribed(event) {
return this._unsubscribeFromNewsletters(event);
}
async handleComplained(event) {
return this._unsubscribeFromNewsletters(event);
}
async _unsubscribeFromNewsletters(event) {
const memberId = await this.getMemberId(event);
if (!memberId) {
return false;
}
const subscribedNewsletterIds = await this.db.knex('members_newsletters')
.where('member_id', '=', memberId)
.pluck('newsletter_id');
await this.db.knex('members_newsletters')
.where('member_id', '=', memberId)
.del();
const nowUTC = moment.utc().toDate();
for (const newsletterId of subscribedNewsletterIds) {
await this.db.knex('members_subscribe_events').insert({
id: ObjectID().toHexString(),
member_id: memberId,
newsletter_id: newsletterId,
subscribed: false,
created_at: nowUTC,
source: 'member'
});
}
const updateResult = await this.db.knex('members')
.where('id', '=', memberId)
.update({
updated_at: moment.utc().toDate()
});
return updateResult > 0;
}
}
module.exports = GhostEventProcessor;

View File

@ -3,7 +3,11 @@ const ObjectID = require('bson-objectid').default;
class EmailServiceWrapper {
init() {
const {EmailService, EmailController, EmailRenderer, SendingService, BatchSendingService, EmailSegmenter} = require('@tryghost/email-service');
if (this.service) {
return;
}
const {EmailService, EmailController, EmailRenderer, SendingService, BatchSendingService, EmailSegmenter, EmailEventStorage} = require('@tryghost/email-service');
const {Post, Newsletter, Email, EmailBatch, EmailRecipient, Member} = require('../../models');
const settingsCache = require('../../../shared/settings-cache');
const jobsService = require('../jobs');
@ -11,6 +15,7 @@ class EmailServiceWrapper {
const db = require('../../data/db');
const membersRepository = membersService.api.members;
const limitService = require('../limits');
const domainEvents = require('@tryghost/domain-events');
const emailRenderer = new EmailRenderer();
const sendingService = new SendingService({
@ -58,6 +63,12 @@ class EmailServiceWrapper {
Email
}
});
this.eventStorage = new EmailEventStorage({
db,
membersRepository
});
this.eventStorage.listen(domainEvents);
}
}

View File

@ -7,6 +7,7 @@ const JobManager = require('@tryghost/job-manager');
const logging = require('@tryghost/logging');
const models = require('../../models');
const sentry = require('../../../shared/sentry');
const domainEvents = require('@tryghost/domain-events');
const errorHandler = (error, workerMeta) => {
logging.info(`Capturing error for worker during execution of job: ${workerMeta.name}`);
@ -15,7 +16,9 @@ const errorHandler = (error, workerMeta) => {
};
const workerMessageHandler = ({name, message}) => {
logging.info(`Worker for job ${name} sent a message: ${message}`);
if (typeof message === 'string') {
logging.info(`Worker for job ${name} sent a message: ${message}`);
}
};
const initTestMode = () => {
@ -39,7 +42,7 @@ const initTestMode = () => {
}, 5000);
};
const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job});
const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents});
module.exports = jobManager;
module.exports.initTestMode = initTestMode;

View File

@ -27,7 +27,7 @@ class MembersEventsServiceWrapper {
services: {
settingsCache
},
async getMembersApi() {
getMembersApi() {
return members.api;
}
});

View File

@ -0,0 +1,438 @@
const sinon = require('sinon');
const {agentProvider, fixtureManager, mockManager} = require('../../../utils/e2e-framework');
const assert = require('assert');
const models = require('../../../../core/server/models');
const domainEvents = require('@tryghost/domain-events');
const MailgunClient = require('@tryghost/mailgun-client');
const {run} = require('../../../../core/server/services/email-analytics/jobs/fetch-latest/run.js');
const membersService = require('../../../../core/server/services/members');
async function sleep(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
// Test the whole E2E flow from Mailgun events -> handling and storage
describe('EmailEventStorage', function () {
let _mailgunClient;
let agent;
let events = [];
let jobsService;
before(async function () {
agent = await agentProvider.getAdminAPIAgent();
await fixtureManager.init('newsletters', 'members:newsletters', 'members:emails');
await agent.loginAsOwner();
// Only create reference to jobsService after Ghost boot
jobsService = require('../../../../core/server/services/jobs');
sinon.stub(MailgunClient.prototype, 'fetchEvents').callsFake(async function (_, batchHandler) {
const normalizedEvents = events.map(this.normalizeEvent) || [];
return [await batchHandler(normalizedEvents)];
});
});
it('Can handle delivered events', async function () {
const emailBatch = fixtureManager.get('email_batches', 0);
const emailId = emailBatch.email_id;
const emailRecipient = fixtureManager.get('email_recipients', 0);
assert(emailRecipient.batch_id === emailBatch.id);
const memberId = emailRecipient.member_id;
const providerId = emailBatch.provider_id;
const timestamp = new Date(2000, 0, 1);
events = [{
event: 'delivered',
recipient: emailRecipient.member_email,
'user-variables': {
'email-id': emailId
},
message: {
headers: {
'message-id': providerId
}
},
// unix timestamp
timestamp: Math.round(timestamp.getTime() / 1000)
}];
const initialModel = await models.EmailRecipient.findOne({
id: emailRecipient.id
}, {require: true});
assert.equal(initialModel.get('delivered_at'), null);
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
assert.equal(result.delivered, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
// Now wait for events processed
await sleep(100);
// Check if status has changed to delivered, with correct timestamp
const updatedEmailRecipient = await models.EmailRecipient.findOne({
id: emailRecipient.id
}, {require: true});
assert.equal(updatedEmailRecipient.get('delivered_at').toUTCString(), timestamp.toUTCString());
});
it('Can handle delivered events without user variables', async function () {
const emailBatch = fixtureManager.get('email_batches', 0);
const emailId = emailBatch.email_id;
const emailRecipient = fixtureManager.get('email_recipients', 0);
assert(emailRecipient.batch_id === emailBatch.id);
const memberId = emailRecipient.member_id;
const providerId = emailBatch.provider_id;
const timestamp = new Date(2000, 0, 1);
// Reset
await models.EmailRecipient.edit({delivered_at: null}, {
id: emailRecipient.id
});
events = [{
event: 'delivered',
recipient: emailRecipient.member_email,
'user-variables': {},
message: {
headers: {
'message-id': providerId
}
},
// unix timestamp
timestamp: Math.round(timestamp.getTime() / 1000)
}];
const initialModel = await models.EmailRecipient.findOne({
id: emailRecipient.id
}, {require: true});
assert.equal(initialModel.get('delivered_at'), null);
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
assert.equal(result.delivered, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
// Now wait for events processed
await sleep(100);
// Check if status has changed to delivered, with correct timestamp
const updatedEmailRecipient = await models.EmailRecipient.findOne({
id: emailRecipient.id
}, {require: true});
assert.equal(updatedEmailRecipient.get('delivered_at').toUTCString(), timestamp.toUTCString());
});
it('Can handle opened events', async function () {
const emailBatch = fixtureManager.get('email_batches', 0);
const emailId = emailBatch.email_id;
const emailRecipient = fixtureManager.get('email_recipients', 0);
assert(emailRecipient.batch_id === emailBatch.id);
const memberId = emailRecipient.member_id;
const providerId = emailBatch.provider_id;
const timestamp = new Date(2000, 0, 1);
events = [{
event: 'opened',
recipient: emailRecipient.member_email,
'user-variables': {
'email-id': emailId
},
message: {
headers: {
'message-id': providerId
}
},
// unix timestamp
timestamp: Math.round(timestamp.getTime() / 1000)
}];
const initialModel = await models.EmailRecipient.findOne({
id: emailRecipient.id
}, {require: true});
assert.equal(initialModel.get('opened_at'), null);
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
assert.equal(result.opened, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
// Now wait for events processed
await sleep(100);
// Check if status has changed to delivered, with correct timestamp
const updatedEmailRecipient = await models.EmailRecipient.findOne({
id: emailRecipient.id
}, {require: true});
assert.equal(updatedEmailRecipient.get('opened_at').toUTCString(), timestamp.toUTCString());
// TODO: check last seen at
});
it('Can handle permanent failure events', async function () {
const emailBatch = fixtureManager.get('email_batches', 0);
const emailId = emailBatch.email_id;
const emailRecipient = fixtureManager.get('email_recipients', 0);
assert(emailRecipient.batch_id === emailBatch.id);
const memberId = emailRecipient.member_id;
const providerId = emailBatch.provider_id;
const timestamp = new Date(2000, 0, 1);
events = [{
event: 'failed',
severity: 'permanent',
recipient: emailRecipient.member_email,
'user-variables': {
'email-id': emailId
},
message: {
headers: {
'message-id': providerId
}
},
// unix timestamp
timestamp: Math.round(timestamp.getTime() / 1000)
}];
const initialModel = await models.EmailRecipient.findOne({
id: emailRecipient.id
}, {require: true});
assert.equal(initialModel.get('failed_at'), null);
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
assert.equal(result.permanentFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
// Now wait for events processed
await sleep(200);
// Check if status has changed to delivered, with correct timestamp
const updatedEmailRecipient = await models.EmailRecipient.findOne({
id: emailRecipient.id
}, {require: true});
assert.equal(updatedEmailRecipient.get('failed_at').toUTCString(), timestamp.toUTCString());
});
it('Can handle tempoary failure events', async function () {
const emailBatch = fixtureManager.get('email_batches', 0);
const emailId = emailBatch.email_id;
const emailRecipient = fixtureManager.get('email_recipients', 0);
assert(emailRecipient.batch_id === emailBatch.id);
const memberId = emailRecipient.member_id;
const providerId = emailBatch.provider_id;
const timestamp = new Date(2000, 0, 1);
// Reset
await models.EmailRecipient.edit({failed_at: null}, {
id: emailRecipient.id
});
events = [{
event: 'failed',
severity: 'temporary',
recipient: emailRecipient.member_email,
'user-variables': {
'email-id': emailId
},
message: {
headers: {
'message-id': providerId
}
},
// unix timestamp
timestamp: Math.round(timestamp.getTime() / 1000)
}];
const initialModel = await models.EmailRecipient.findOne({
id: emailRecipient.id
}, {require: true});
assert.equal(initialModel.get('failed_at'), null);
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
assert.equal(result.temporaryFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
// Now wait for events processed
await sleep(200);
// Check if status has changed to delivered, with correct timestamp
const updatedEmailRecipient = await models.EmailRecipient.findOne({
id: emailRecipient.id
}, {require: true});
// Not mark as failed
assert.equal(initialModel.get('failed_at'), null);
});
it('Can handle complaint events', async function () {
const emailBatch = fixtureManager.get('email_batches', 0);
const emailId = emailBatch.email_id;
const emailRecipient = fixtureManager.get('email_recipients', 0);
assert(emailRecipient.batch_id === emailBatch.id);
const memberId = emailRecipient.member_id;
const providerId = emailBatch.provider_id;
const timestamp = new Date(2000, 0, 1);
// Check not unsubscribed
const memberInitial = await membersService.api.members.get({id: memberId}, {withRelated: ['newsletters']});
assert.notEqual(memberInitial.related('newsletters').length, 0, 'This test requires a member that is subscribed to at least one newsletter');
events = [{
event: 'complained',
recipient: emailRecipient.member_email,
'user-variables': {
'email-id': emailId
},
message: {
headers: {
'message-id': providerId
}
},
// unix timestamp
timestamp: Math.round(timestamp.getTime() / 1000)
}];
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
assert.equal(result.complained, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
// Now wait for events processed
await sleep(200);
// Check if unsubscribed
const member = await membersService.api.members.get({id: memberId}, {withRelated: ['newsletters']});
assert.equal(member.related('newsletters').length, 0);
});
it('Can handle unsubscribe events', async function () {
const emailBatch = fixtureManager.get('email_batches', 0);
const emailId = emailBatch.email_id;
const emailRecipient = fixtureManager.get('email_recipients', 0);
assert(emailRecipient.batch_id === emailBatch.id);
const memberId = emailRecipient.member_id;
const providerId = emailBatch.provider_id;
const timestamp = new Date(2000, 0, 1);
// Reset
await membersService.api.members.update({newsletters: [
{
id: fixtureManager.get('newsletters', 0).id
}
]}, {id: memberId});
// Check not unsubscribed
const memberInitial = await membersService.api.members.get({id: memberId}, {withRelated: ['newsletters']});
assert.notEqual(memberInitial.related('newsletters').length, 0, 'This test requires a member that is subscribed to at least one newsletter');
events = [{
event: 'unsubscribed',
recipient: emailRecipient.member_email,
'user-variables': {
'email-id': emailId
},
message: {
headers: {
'message-id': providerId
}
},
// unix timestamp
timestamp: Math.round(timestamp.getTime() / 1000)
}];
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
assert.equal(result.unsubscribed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
// Now wait for events processed
await sleep(200);
// Check if unsubscribed
const member = await membersService.api.members.get({id: memberId}, {withRelated: ['newsletters']});
assert.equal(member.related('newsletters').length, 0);
});
it('Can handle unknown events', async function () {
const emailBatch = fixtureManager.get('email_batches', 0);
const emailId = emailBatch.email_id;
const emailRecipient = fixtureManager.get('email_recipients', 0);
assert(emailRecipient.batch_id === emailBatch.id);
const memberId = emailRecipient.member_id;
const providerId = emailBatch.provider_id;
const timestamp = new Date(2000, 0, 1);
events = [{
event: 'ceci-nest-pas-un-event',
recipient: emailRecipient.member_email,
'user-variables': {
'email-id': emailId
},
message: {
headers: {
'message-id': providerId
}
},
// unix timestamp
timestamp: Math.round(timestamp.getTime() / 1000)
}];
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
assert.equal(result.unhandled, 1);
assert.deepEqual(result.emailIds, []);
assert.deepEqual(result.memberIds, []);
});
});

View File

@ -37,7 +37,18 @@ class DomainEvents {
* @returns {void}
*/
static dispatch(event) {
DomainEvents.ee.emit(event.constructor.name, event);
DomainEvents.dispatchRaw(event.constructor.name, event);
}
/**
* Dispatch an event in case you don't have an instance of the event class, but you do have the event name and event data.
* @template Data
* @param {string} name
* @param {Data} data
* @returns {void}
*/
static dispatchRaw(name, data) {
DomainEvents.ee.emit(name, data);
}
}

View File

@ -1,5 +1,4 @@
module.exports = {
EmailAnalyticsService: require('./lib/email-analytics-service'),
EventProcessingResult: require('./lib/event-processing-result'),
EventProcessor: require('./lib/event-processor')
EventProcessingResult: require('./lib/event-processing-result')
};

View File

@ -1,8 +1,22 @@
const EventProcessingResult = require('./event-processing-result');
const debug = require('@tryghost/debug')('services:email-analytics');
/**
* @typedef {import('@tryghost/email-service').EmailEventProcessor} EmailEventProcessor
*/
module.exports = class EmailAnalyticsService {
constructor({config, settings, queries, eventProcessor, providers} = {}) {
config;
settings;
queries;
eventProcessor;
providers;
/**
* @param {object} dependencies
* @param {EmailEventProcessor} dependencies.eventProcessor
*/
constructor({config, settings, queries, eventProcessor, providers}) {
this.config = config;
this.settings = settings;
this.queries = queries;
@ -61,13 +75,106 @@ module.exports = class EmailAnalyticsService {
const result = new EventProcessingResult();
for (const event of events) {
const batchResult = await this.eventProcessor.process(event);
const batchResult = await this.processEvent(event);
result.merge(batchResult);
}
return result;
}
/**
*
* @param {{type: any; severity: any; recipientEmail: any; emailId: any; providerId: string; timestamp: Date;}} event
* @returns {Promise<EventProcessingResult>}
*/
async processEvent(event) {
if (event.type === 'delivered') {
const recipient = await this.eventProcessor.handleDelivered({emailId: event.emailId, providerId: event.providerId, email: event.recipientEmail}, event.timestamp);
if (recipient) {
return new EventProcessingResult({
delivered: 1,
emailIds: [recipient.emailId],
memberIds: [recipient.memberId]
});
}
return new EventProcessingResult({unprocessable: 1});
}
if (event.type === 'opened') {
const recipient = await this.eventProcessor.handleOpened({emailId: event.emailId, providerId: event.providerId, email: event.recipientEmail}, event.timestamp);
if (recipient) {
return new EventProcessingResult({
opened: 1,
emailIds: [recipient.emailId],
memberIds: [recipient.memberId]
});
}
return new EventProcessingResult({unprocessable: 1});
}
if (event.type === 'failed') {
if (event.severity === 'permanent') {
const recipient = await this.eventProcessor.handlePermanentFailed({emailId: event.emailId, providerId: event.providerId, email: event.recipientEmail}, event.timestamp);
if (recipient) {
return new EventProcessingResult({
permanentFailed: 1,
emailIds: [recipient.emailId],
memberIds: [recipient.memberId]
});
}
return new EventProcessingResult({unprocessable: 1});
} else {
const recipient = await this.eventProcessor.handleTemporaryFailed({emailId: event.emailId, providerId: event.providerId, email: event.recipientEmail}, event.timestamp);
if (recipient) {
return new EventProcessingResult({
temporaryFailed: 1,
emailIds: [recipient.emailId],
memberIds: [recipient.memberId]
});
}
return new EventProcessingResult({unprocessable: 1});
}
}
if (event.type === 'unsubscribed') {
const recipient = await this.eventProcessor.handleUnsubscribed({emailId: event.emailId, providerId: event.providerId, email: event.recipientEmail}, event.timestamp);
if (recipient) {
return new EventProcessingResult({
unsubscribed: 1,
emailIds: [recipient.emailId],
memberIds: [recipient.memberId]
});
}
return new EventProcessingResult({unprocessable: 1});
}
if (event.type === 'complained') {
const recipient = await this.eventProcessor.handleComplained({emailId: event.emailId, providerId: event.providerId, email: event.recipientEmail}, event.timestamp);
if (recipient) {
return new EventProcessingResult({
complained: 1,
emailIds: [recipient.emailId],
memberIds: [recipient.memberId]
});
}
return new EventProcessingResult({unprocessable: 1});
}
return new EventProcessingResult({unhandled: 1});
}
async aggregateStats({emailIds = [], memberIds = []}) {
for (const emailId of emailIds) {
await this.aggregateEmailStats(emailId);

View File

@ -1,6 +1,20 @@
const _ = require('lodash');
class EventProcessingResult {
/**
* @param {object} result
* @param {number} [result.delivered]
* @param {number} [result.opened]
* @param {number} [result.temporaryFailed]
* @param {number} [result.permanentFailed]
* @param {number} [result.unsubscribed]
* @param {number} [result.complained]
* @param {number} [result.unhandled]
* @param {number} [result.unprocessable]
* @param {number} [result.processingFailures]
* @param {string[]} [result.emailIds]
* @param {string[]} [result.memberIds]
*/
constructor(result = {}) {
// counts
this.delivered = 0;

View File

@ -1,210 +0,0 @@
module.exports = class EventProcessor {
constructor() {
}
// override these in a sub-class to define app-specific behavior
async getEmailId(/*event*/) {
return undefined;
}
async getMemberId(/*event*/) {
return undefined;
}
async handleDelivered(/*event*/) {
return false;
}
async handleOpened(/*event*/) {
return false;
}
async handleTemporaryFailed(/*event*/) {
return false;
}
async handlePermanentFailed(/*event*/) {
return false;
}
async handleUnsubscribed(/*event*/) {
return false;
}
async handleComplained(/*event*/) {
return false;
}
// superclass functionality ------------------------------------------------
async process(event) {
if (event.type === 'delivered') {
return this._handleDelivered(event);
}
if (event.type === 'opened') {
return this._handleOpened(event);
}
if (event.type === 'failed') {
if (event.severity === 'permanent') {
return this._handlePermanentFailed(event);
} else {
return this._handleTemporaryFailed(event);
}
}
if (event.type === 'unsubscribed') {
return this._handleUnsubscribed(event);
}
if (event.type === 'complained') {
return this._handleComplained(event);
}
return {
unhandled: 1
};
}
async _handleDelivered(event) {
const emailId = await this._getEmailId(event);
if (!emailId) {
return {unprocessable: 1};
}
const handlerSuccess = await this.handleDelivered(event);
if (handlerSuccess) {
const memberId = await this._getMemberId(event);
return {
delivered: 1,
emailIds: [emailId],
memberIds: [memberId]
};
}
return {unprocessable: 1};
}
async _handleOpened(event) {
const emailId = await this._getEmailId(event);
if (!emailId) {
return {unprocessable: 1};
}
const handlerSuccess = await this.handleOpened(event);
if (handlerSuccess) {
const memberId = await this._getMemberId(event);
return {
opened: 1,
emailIds: [emailId],
memberIds: [memberId]
};
}
return {unprocessable: 1};
}
async _handlePermanentFailed(event) {
const emailId = await this._getEmailId(event);
if (!emailId) {
return {unprocessable: 1};
}
const handlerSuccess = await this.handlePermanentFailed(event);
if (handlerSuccess) {
return {
permanentFailed: 1,
emailIds: [emailId]
};
}
return {unprocessable: 1};
}
async _handleTemporaryFailed(event) {
const emailId = await this._getEmailId(event);
if (!emailId) {
return {unprocessable: 1};
}
const handlerSuccess = await this.handleTemporaryFailed(event);
if (handlerSuccess) {
return {
temporaryFailed: 1,
emailIds: [emailId]
};
}
return {unprocessable: 1};
}
async _handleUnsubscribed(event) {
const emailId = await this._getEmailId(event);
if (!emailId) {
return {unprocessable: 1};
}
const handlerSuccess = await this.handleUnsubscribed(event);
if (handlerSuccess) {
return {
unsubscribed: 1,
emailIds: [emailId]
};
}
return {
unprocessable: 1
};
}
async _handleComplained(event) {
const emailId = await this._getEmailId(event);
if (!emailId) {
return {unprocessable: 1};
}
const handlerSuccess = await this.handleComplained(event);
if (handlerSuccess) {
return {
complained: 1,
emailIds: [emailId]
};
}
return {
unprocessable: 1
};
}
async _getEmailId(event) {
if (event.emailId) {
return event.emailId;
}
return await this.getEmailId(event);
}
async _getMemberId(event) {
if (event.memberId) {
return event.memberId;
}
return await this.getMemberId(event);
}
};

View File

@ -5,22 +5,35 @@ require('./utils');
const sinon = require('sinon');
const {
EmailAnalyticsService,
EventProcessor
EmailAnalyticsService
} = require('..');
const EventProcessingResult = require('../lib/event-processing-result');
describe('EmailAnalyticsService', function () {
let eventProcessor;
beforeEach(function () {
eventProcessor = {};
eventProcessor.handleDelivered = sinon.stub().callsFake(({emailId}) => {
return {
emailId,
emailRecipientId: emailId,
memberId: 1
};
});
eventProcessor.handleOpened = sinon.stub().callsFake(({emailId}) => {
return {
emailId,
emailRecipientId: emailId,
memberId: 1
};
});
});
describe('fetchAll', function () {
let eventProcessor;
let providers;
let queries;
beforeEach(function () {
eventProcessor = new EventProcessor();
eventProcessor.handleDelivered = sinon.fake.resolves(true);
eventProcessor.handleOpened = sinon.fake.resolves(true);
providers = {
testing: {
async fetchAll(batchHandler) {
@ -102,9 +115,6 @@ describe('EmailAnalyticsService', function () {
describe('processEventBatch', function () {
it('uses passed-in event processor', async function () {
const eventProcessor = new EventProcessor();
eventProcessor.handleDelivered = sinon.stub().resolves(true);
const service = new EmailAnalyticsService({
eventProcessor
});
@ -124,8 +134,10 @@ describe('EmailAnalyticsService', function () {
result.should.deepEqual(new EventProcessingResult({
delivered: 2,
unprocessable: 1,
emailIds: [1, 2]
opened: 1,
unprocessable: 0,
emailIds: [1, 2],
memberIds: [1]
}));
});
});

View File

@ -1,431 +0,0 @@
// Switch these lines once there are useful utils
// const testUtils = require('./utils');
require('./utils');
const sinon = require('sinon');
const {EventProcessor} = require('..');
class CustomEventProcessor extends EventProcessor {
constructor() {
super(...arguments);
this.getEmailId = sinon.fake.resolves('emailId');
this.getMemberId = sinon.fake.resolves('memberId');
this.handleDelivered = sinon.fake.resolves(true);
this.handleOpened = sinon.fake.resolves(true);
this.handleTemporaryFailed = sinon.fake.resolves(true);
this.handlePermanentFailed = sinon.fake.resolves(true);
this.handleUnsubscribed = sinon.fake.resolves(true);
this.handleComplained = sinon.fake.resolves(true);
}
}
describe('EventProcessor', function () {
let eventProcessor;
beforeEach(function () {
eventProcessor = new CustomEventProcessor();
});
afterEach(function () {
sinon.restore();
});
describe('delivered', function () {
it('works', async function () {
const result = await eventProcessor.process({
type: 'delivered'
});
eventProcessor.getEmailId.calledOnce.should.be.true();
eventProcessor.getMemberId.calledOnce.should.be.true();
eventProcessor.handleDelivered.calledOnce.should.be.true();
result.should.deepEqual({
delivered: 1,
emailIds: ['emailId'],
memberIds: ['memberId']
});
});
it('gets emailId and memberId directly from event if available', async function () {
const result = await eventProcessor.process({
type: 'delivered',
emailId: 'testEmailId',
memberId: 'testMemberId'
});
eventProcessor.getEmailId.called.should.be.false();
eventProcessor.getMemberId.called.should.be.false();
eventProcessor.handleDelivered.calledOnce.should.be.true();
result.should.deepEqual({
delivered: 1,
emailIds: ['testEmailId'],
memberIds: ['testMemberId']
});
});
it('does not process if email id is not found', async function () {
sinon.replace(eventProcessor, 'getEmailId', sinon.fake.resolves(null));
const result = await eventProcessor.process({
type: 'delivered'
});
eventProcessor.getEmailId.calledOnce.should.be.true();
eventProcessor.getMemberId.calledOnce.should.be.false();
eventProcessor.handleDelivered.calledOnce.should.be.false();
result.should.deepEqual({
unprocessable: 1
});
});
it('does not process if handleDelivered is not overridden', async function () {
// test non-extended superclass instance
eventProcessor = new EventProcessor();
const result = await eventProcessor.process({
type: 'delivered',
emailId: 'testEmailId',
memberId: 'testMemberId'
});
result.should.deepEqual({
unprocessable: 1
});
});
});
describe('opened', function () {
it('works', async function () {
const result = await eventProcessor.process({
type: 'opened'
});
eventProcessor.getEmailId.calledOnce.should.be.true();
eventProcessor.getMemberId.calledOnce.should.be.true();
eventProcessor.handleOpened.calledOnce.should.be.true();
result.should.deepEqual({
opened: 1,
emailIds: ['emailId'],
memberIds: ['memberId']
});
});
it('gets emailId and memberId directly from event if available', async function () {
const result = await eventProcessor.process({
type: 'opened',
emailId: 'testEmailId',
memberId: 'testMemberId'
});
eventProcessor.getEmailId.called.should.be.false();
eventProcessor.getMemberId.called.should.be.false();
eventProcessor.handleOpened.calledOnce.should.be.true();
result.should.deepEqual({
opened: 1,
emailIds: ['testEmailId'],
memberIds: ['testMemberId']
});
});
it('does not process if email id is not found', async function () {
sinon.replace(eventProcessor, 'getEmailId', sinon.fake.resolves(null));
const result = await eventProcessor.process({
type: 'opened'
});
eventProcessor.getEmailId.calledOnce.should.be.true();
eventProcessor.getMemberId.calledOnce.should.be.false();
eventProcessor.handleOpened.calledOnce.should.be.false();
result.should.deepEqual({
unprocessable: 1
});
});
it('does not process if handleOpened is not overridden', async function () {
// test non-extended superclass instance
eventProcessor = new EventProcessor();
const result = await eventProcessor.process({
type: 'opened',
emailId: 'testEmailId',
memberId: 'testMemberId'
});
result.should.deepEqual({
unprocessable: 1
});
});
});
describe('failed - permanent', function () {
it('works', async function () {
const result = await eventProcessor.process({
type: 'failed',
severity: 'permanent'
});
eventProcessor.getEmailId.calledOnce.should.be.true();
eventProcessor.getMemberId.called.should.be.false();
eventProcessor.handlePermanentFailed.calledOnce.should.be.true();
result.should.deepEqual({
permanentFailed: 1,
emailIds: ['emailId']
});
});
it('gets emailId directly from event if available', async function () {
const result = await eventProcessor.process({
type: 'failed',
severity: 'permanent',
emailId: 'testEmailId'
});
eventProcessor.getEmailId.called.should.be.false();
eventProcessor.getMemberId.called.should.be.false();
eventProcessor.handlePermanentFailed.calledOnce.should.be.true();
result.should.deepEqual({
permanentFailed: 1,
emailIds: ['testEmailId']
});
});
it('does not process if email id is not found', async function () {
sinon.replace(eventProcessor, 'getEmailId', sinon.fake.resolves(null));
const result = await eventProcessor.process({
type: 'failed',
severity: 'permanent'
});
eventProcessor.getEmailId.calledOnce.should.be.true();
eventProcessor.getMemberId.called.should.be.false();
eventProcessor.handlePermanentFailed.calledOnce.should.be.false();
result.should.deepEqual({
unprocessable: 1
});
});
it('does not process if handlePermanentFailed is not overridden', async function () {
// test non-extended superclass instance
eventProcessor = new EventProcessor();
const result = await eventProcessor.process({
type: 'opened',
severity: 'permanent',
emailId: 'testEmailId'
});
result.should.deepEqual({
unprocessable: 1
});
});
});
describe('failed - temporary', function () {
it('works', async function () {
const result = await eventProcessor.process({
type: 'failed',
severity: 'temporary'
});
eventProcessor.getEmailId.calledOnce.should.be.true();
eventProcessor.getMemberId.called.should.be.false();
eventProcessor.handleTemporaryFailed.calledOnce.should.be.true();
result.should.deepEqual({
temporaryFailed: 1,
emailIds: ['emailId']
});
});
it('gets emailId directly from event if available', async function () {
const result = await eventProcessor.process({
type: 'failed',
severity: 'temporary',
emailId: 'testEmailId'
});
eventProcessor.getEmailId.called.should.be.false();
eventProcessor.getMemberId.called.should.be.false();
eventProcessor.handleTemporaryFailed.calledOnce.should.be.true();
result.should.deepEqual({
temporaryFailed: 1,
emailIds: ['testEmailId']
});
});
it('does not process if email id is not found', async function () {
sinon.replace(eventProcessor, 'getEmailId', sinon.fake.resolves(null));
const result = await eventProcessor.process({
type: 'failed',
severity: 'temporary'
});
eventProcessor.getEmailId.calledOnce.should.be.true();
eventProcessor.getMemberId.called.should.be.false();
eventProcessor.handleTemporaryFailed.calledOnce.should.be.false();
result.should.deepEqual({
unprocessable: 1
});
});
it('does not process if handleTemporaryFailed is not overridden', async function () {
// test non-extended superclass instance
eventProcessor = new EventProcessor();
const result = await eventProcessor.process({
type: 'opened',
severity: 'temporary',
emailId: 'testEmailId'
});
result.should.deepEqual({
unprocessable: 1
});
});
});
describe('unsubscribed', function () {
it('works', async function () {
const result = await eventProcessor.process({
type: 'unsubscribed'
});
eventProcessor.getEmailId.calledOnce.should.be.true();
eventProcessor.getMemberId.calledOnce.should.be.false();
eventProcessor.handleUnsubscribed.calledOnce.should.be.true();
result.should.deepEqual({
unsubscribed: 1,
emailIds: ['emailId']
});
});
it('gets emailId and memberId directly from event if available', async function () {
const result = await eventProcessor.process({
type: 'unsubscribed',
emailId: 'testEmailId',
memberId: 'testMemberId'
});
eventProcessor.getEmailId.called.should.be.false();
eventProcessor.getMemberId.called.should.be.false();
eventProcessor.handleUnsubscribed.calledOnce.should.be.true();
result.should.deepEqual({
unsubscribed: 1,
emailIds: ['testEmailId']
});
});
it('does not process if email id is not found', async function () {
sinon.replace(eventProcessor, 'getEmailId', sinon.fake.resolves(null));
const result = await eventProcessor.process({
type: 'unsubscribed'
});
eventProcessor.getEmailId.calledOnce.should.be.true();
eventProcessor.getMemberId.calledOnce.should.be.false();
eventProcessor.handleUnsubscribed.calledOnce.should.be.false();
result.should.deepEqual({
unprocessable: 1
});
});
it('does not process if handleUnsubscribed is not overridden', async function () {
// test non-extended superclass instance
eventProcessor = new EventProcessor();
const result = await eventProcessor.process({
type: 'unsubscribed',
emailId: 'testEmailId'
});
result.should.deepEqual({
unprocessable: 1
});
});
});
describe('complained', function () {
it('works', async function () {
const result = await eventProcessor.process({
type: 'complained'
});
eventProcessor.getEmailId.calledOnce.should.be.true();
eventProcessor.getMemberId.calledOnce.should.be.false();
eventProcessor.handleComplained.calledOnce.should.be.true();
result.should.deepEqual({
complained: 1,
emailIds: ['emailId']
});
});
it('gets emailId and memberId directly from event if available', async function () {
const result = await eventProcessor.process({
type: 'complained',
emailId: 'testEmailId',
memberId: 'testMemberId'
});
eventProcessor.getEmailId.called.should.be.false();
eventProcessor.getMemberId.called.should.be.false();
eventProcessor.handleComplained.calledOnce.should.be.true();
result.should.deepEqual({
complained: 1,
emailIds: ['testEmailId']
});
});
it('does not process if email id is not found', async function () {
sinon.replace(eventProcessor, 'getEmailId', sinon.fake.resolves(null));
const result = await eventProcessor.process({
type: 'complained'
});
eventProcessor.getEmailId.calledOnce.should.be.true();
eventProcessor.getMemberId.calledOnce.should.be.false();
eventProcessor.handleComplained.calledOnce.should.be.false();
result.should.deepEqual({
unprocessable: 1
});
});
it('does not process if handleComplained is not overridden', async function () {
// test non-extended superclass instance
eventProcessor = new EventProcessor();
const result = await eventProcessor.process({
type: 'complained',
emailId: 'testEmailId'
});
result.should.deepEqual({
unprocessable: 1
});
});
});
});

View File

@ -1,4 +1,7 @@
module.exports = {
SpamComplaintEvent: require('./lib/SpamComplaintEvent'),
EmailBouncedEvent: require('./lib/EmailBouncedEvent')
EmailBouncedEvent: require('./lib/EmailBouncedEvent'),
EmailDeliveredEvent: require('./lib/EmailDeliveredEvent'),
EmailOpenedEvent: require('./lib/EmailOpenedEvent'),
EmailUnsubscribedEvent: require('./lib/EmailUnsubscribedEvent')
};

View File

@ -1,7 +1,3 @@
/**
* @typedef {import('bson-objectid').default} ObjectID
*/
module.exports = class EmailBouncedEvent {
/**
* @readonly
@ -11,16 +7,22 @@ module.exports = class EmailBouncedEvent {
/**
* @readonly
* @type {ObjectID}
* @type {string}
*/
memberId;
/**
* @readonly
* @type {ObjectID}
* @type {string}
*/
emailId;
/**
* @readonly
* @type {string}
*/
emailRecipientId;
/**
* @readonly
* @type {Date}
@ -30,10 +32,11 @@ module.exports = class EmailBouncedEvent {
/**
* @private
*/
constructor({email, memberId, emailId, timestamp}) {
this.email = email;
constructor({email, memberId, emailId, emailRecipientId, timestamp}) {
this.memberId = memberId;
this.emailId = emailId;
this.email = email;
this.emailRecipientId = emailRecipientId;
this.timestamp = timestamp;
}

View File

@ -0,0 +1,49 @@
module.exports = class EmailDeliveredEvent {
/**
* @readonly
* @type {string}
*/
email;
/**
* @readonly
* @type {string}
*/
memberId;
/**
* @readonly
* @type {string}
*/
emailId;
/**
* @readonly
* @type {string}
*/
emailRecipientId;
/**
* @readonly
* @type {Date}
*/
timestamp;
/**
* @private
*/
constructor({email, memberId, emailId, emailRecipientId, timestamp}) {
this.email = email;
this.memberId = memberId;
this.emailId = emailId;
this.emailRecipientId = emailRecipientId;
this.timestamp = timestamp;
}
static create(data) {
return new EmailDeliveredEvent({
...data,
timestamp: data.timestamp || new Date
});
}
};

View File

@ -0,0 +1,49 @@
module.exports = class EmailOpenedEvent {
/**
* @readonly
* @type {string}
*/
email;
/**
* @readonly
* @type {string}
*/
memberId;
/**
* @readonly
* @type {string}
*/
emailId;
/**
* @readonly
* @type {string}
*/
emailRecipientId;
/**
* @readonly
* @type {Date}
*/
timestamp;
/**
* @private
*/
constructor({email, memberId, emailId, emailRecipientId, timestamp}) {
this.memberId = memberId;
this.emailId = emailId;
this.emailRecipientId = emailRecipientId;
this.email = email;
this.timestamp = timestamp;
}
static create(data) {
return new EmailOpenedEvent({
...data,
timestamp: data.timestamp || new Date
});
}
};

View File

@ -0,0 +1,42 @@
module.exports = class EmailUnsubscribedEvent {
/**
* @readonly
* @type {string}
*/
email;
/**
* @readonly
* @type {string}
*/
memberId;
/**
* @readonly
* @type {string}
*/
emailId;
/**
* @readonly
* @type {Date}
*/
timestamp;
/**
* @private
*/
constructor({email, memberId, emailId, timestamp}) {
this.memberId = memberId;
this.emailId = emailId;
this.email = email;
this.timestamp = timestamp;
}
static create(data) {
return new EmailUnsubscribedEvent({
...data,
timestamp: data.timestamp || new Date
});
}
};

View File

@ -1,7 +1,3 @@
/**
* @typedef {import('bson-objectid').default} ObjectID
*/
module.exports = class SpamComplaintEvent {
/**
* @readonly
@ -11,13 +7,13 @@ module.exports = class SpamComplaintEvent {
/**
* @readonly
* @type {ObjectID}
* @type {string}
*/
memberId;
/**
* @readonly
* @type {ObjectID}
* @type {string}
*/
emailId;
@ -31,9 +27,9 @@ module.exports = class SpamComplaintEvent {
* @private
*/
constructor({email, memberId, emailId, timestamp}) {
this.email = email;
this.memberId = memberId;
this.emailId = emailId;
this.email = email;
this.timestamp = timestamp;
}

View File

@ -6,8 +6,9 @@ describe('EmailBouncedEvent', function () {
it('exports a static create method to create instances', function () {
const event = EmailBouncedEvent.create({
email: 'test@test.test',
memberId: new ObjectID(),
emailId: new ObjectID(),
memberId: new ObjectID().toHexString(),
emailId: new ObjectID().toHexString(),
emailRecipientId: new ObjectID().toHexString(),
timestamp: new Date()
});
assert(event instanceof EmailBouncedEvent);

View File

@ -0,0 +1,16 @@
const assert = require('assert');
const ObjectID = require('bson-objectid').default;
const EmailDeliveredEvent = require('../../lib/EmailDeliveredEvent');
describe('EmailDeliveredEvent', function () {
it('exports a static create method to create instances', function () {
const event = EmailDeliveredEvent.create({
email: 'test@test.test',
memberId: new ObjectID().toHexString(),
emailId: new ObjectID().toHexString(),
emailRecipientId: new ObjectID().toHexString(),
timestamp: new Date()
});
assert(event instanceof EmailDeliveredEvent);
});
});

View File

@ -0,0 +1,16 @@
const assert = require('assert');
const ObjectID = require('bson-objectid').default;
const EmailOpenedEvent = require('../../lib/EmailOpenedEvent');
describe('EmailOpenedEvent', function () {
it('exports a static create method to create instances', function () {
const event = EmailOpenedEvent.create({
email: 'test@test.test',
memberId: new ObjectID().toHexString(),
emailId: new ObjectID().toHexString(),
emailRecipientId: new ObjectID().toHexString(),
timestamp: new Date()
});
assert(event instanceof EmailOpenedEvent);
});
});

View File

@ -0,0 +1,15 @@
const assert = require('assert');
const ObjectID = require('bson-objectid').default;
const EmailUnsubscribedEvent = require('../../lib/EmailUnsubscribedEvent');
describe('EmailUnsubscribedEvent', function () {
it('exports a static create method to create instances', function () {
const event = EmailUnsubscribedEvent.create({
email: 'test@test.test',
memberId: new ObjectID().toHexString(),
emailId: new ObjectID().toHexString(),
timestamp: new Date()
});
assert(event instanceof EmailUnsubscribedEvent);
});
});

View File

@ -6,8 +6,8 @@ describe('SpamComplaintEvent', function () {
it('exports a static create method to create instances', function () {
const event = SpamComplaintEvent.create({
email: 'test@test.test',
memberId: new ObjectID(),
emailId: new ObjectID(),
memberId: new ObjectID().toHexString(),
emailId: new ObjectID().toHexString(),
timestamp: new Date()
});
assert(event instanceof SpamComplaintEvent);

View File

@ -4,5 +4,7 @@ module.exports = {
EmailRenderer: require('./lib/email-renderer'),
EmailSegmenter: require('./lib/email-segmenter'),
SendingService: require('./lib/sending-service'),
BatchSendingService: require('./lib/batch-sending-service')
BatchSendingService: require('./lib/batch-sending-service'),
EmailEventProcessor: require('./lib/email-event-processor'),
EmailEventStorage: require('./lib/email-event-storage')
};

View File

@ -0,0 +1,182 @@
const {EmailDeliveredEvent, EmailOpenedEvent, EmailBouncedEvent, SpamComplaintEvent, EmailUnsubscribedEvent} = require('@tryghost/email-events');
/**
* @typedef EmailIdentification
* @property {string} email
* @property {string} providerId
* @property {string} [emailId] Optional email id
*/
/**
* @typedef EmailRecipientInformation
* @property {string} emailRecipientId
* @property {string} memberId
* @property {string} emailId
*/
/**
* WARNING: this class is used in a separate thread (an offloaded job). Be careful when working with settings and models.
*/
class EmailEventProcessor {
#domainEvents;
#db;
constructor({domainEvents, db}) {
this.#domainEvents = domainEvents;
this.#db = db;
// Avoid having to query email_batch by provider_id for every event
this.providerIdEmailIdMap = {};
}
/**
* @param {EmailIdentification} emailIdentification
* @param {Date} timestamp
*/
async handleDelivered(emailIdentification, timestamp) {
const recipient = await this.getRecipient(emailIdentification);
if (recipient) {
this.#domainEvents.dispatch(EmailDeliveredEvent.create({
email: emailIdentification.email,
emailRecipientId: recipient.emailRecipientId,
memberId: recipient.memberId,
emailId: recipient.emailId,
timestamp
}));
}
return recipient;
}
/**
* @param {EmailIdentification} emailIdentification
* @param {Date} timestamp
*/
async handleOpened(emailIdentification, timestamp) {
const recipient = await this.getRecipient(emailIdentification);
if (recipient) {
this.#domainEvents.dispatch(EmailOpenedEvent.create({
email: emailIdentification.email,
emailRecipientId: recipient.emailRecipientId,
memberId: recipient.memberId,
emailId: recipient.emailId,
timestamp
}));
}
return recipient;
}
/**
* @param {EmailIdentification} emailIdentification
*/
async handleTemporaryFailed(emailIdentification) {
const recipient = await this.getRecipient(emailIdentification);
// TODO: store and emit event
return recipient;
}
/**
* @param {EmailIdentification} emailIdentification
* @param {Date} timestamp
*/
async handlePermanentFailed(emailIdentification, timestamp) {
// TODO: also read error message
const recipient = await this.getRecipient(emailIdentification);
if (recipient) {
this.#domainEvents.dispatch(EmailBouncedEvent.create({
email: emailIdentification.email,
memberId: recipient.memberId,
emailId: recipient.emailId,
emailRecipientId: recipient.emailRecipientId,
timestamp
}));
}
return recipient;
}
/**
* @param {EmailIdentification} emailIdentification
* @param {Date} timestamp
*/
async handleUnsubscribed(emailIdentification, timestamp) {
const recipient = await this.getRecipient(emailIdentification);
if (recipient) {
this.#domainEvents.dispatch(EmailUnsubscribedEvent.create({
email: emailIdentification.email,
memberId: recipient.memberId,
emailId: recipient.emailId,
timestamp
}));
}
return recipient;
}
/**
* @param {EmailIdentification} emailIdentification
* @param {Date} timestamp
*/
async handleComplained(emailIdentification, timestamp) {
const recipient = await this.getRecipient(emailIdentification);
if (recipient) {
this.#domainEvents.dispatch(SpamComplaintEvent.create({
email: emailIdentification.email,
memberId: recipient.memberId,
emailId: recipient.emailId,
timestamp
}));
}
return recipient;
}
/**
* @private
* @param {EmailIdentification} emailIdentification
* @returns {Promise<EmailRecipientInformation|undefined>}
*/
async getRecipient(emailIdentification) {
// With the provider_id and email address we can look for the EmailRecipient
const emailId = emailIdentification.emailId ?? await this.getEmailId(emailIdentification.providerId);
if (!emailId) {
// Invalid
return;
}
const {id: emailRecipientId, member_id: memberId} = await this.#db.knex('email_recipients')
.select('id', 'member_id')
.where('member_email', emailIdentification.email)
.where('email_id', emailId)
.first() || {};
if (emailRecipientId && memberId) {
return {
emailRecipientId,
memberId,
emailId
};
}
}
/**
* @private
* @param {string} providerId
* @returns {Promise<string|undefined>}
*/
async getEmailId(providerId) {
if (this.providerIdEmailIdMap[providerId]) {
return this.providerIdEmailIdMap[providerId];
}
const {emailId} = await this.#db.knex('email_batches')
.select('email_id as emailId')
.where('provider_id', providerId)
.first() || {};
if (!emailId) {
return;
}
this.providerIdEmailIdMap[providerId] = emailId;
return emailId;
}
}
module.exports = EmailEventProcessor;

View File

@ -0,0 +1,72 @@
const {EmailDeliveredEvent, EmailOpenedEvent, EmailBouncedEvent, EmailUnsubscribedEvent, SpamComplaintEvent} = require('@tryghost/email-events');
const moment = require('moment-timezone');
class EmailEventStorage {
#db;
#membersRepository;
constructor({db, membersRepository}) {
this.#db = db;
this.#membersRepository = membersRepository;
}
listen(domainEvents) {
domainEvents.subscribe(EmailDeliveredEvent, async (event) => {
await this.handleDelivered(event);
});
domainEvents.subscribe(EmailOpenedEvent, async (event) => {
await this.handleOpened(event);
});
domainEvents.subscribe(EmailBouncedEvent, async (event) => {
await this.handlePermanentFailed(event);
});
domainEvents.subscribe(EmailUnsubscribedEvent, async (event) => {
await this.handleUnsubscribed(event);
});
domainEvents.subscribe(SpamComplaintEvent, async (event) => {
await this.handleComplained(event);
});
}
async handleDelivered(event) {
await this.#db.knex('email_recipients')
.where('id', '=', event.emailRecipientId)
.update({
delivered_at: this.#db.knex.raw('COALESCE(delivered_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')])
});
}
async handleOpened(event) {
await this.#db.knex('email_recipients')
.where('id', '=', event.emailRecipientId)
.update({
opened_at: this.#db.knex.raw('COALESCE(opened_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')])
});
}
async handlePermanentFailed(event) {
await this.#db.knex('email_recipients')
.where('id', '=', event.emailRecipientId)
.update({
failed_at: this.#db.knex.raw('COALESCE(failed_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')])
});
}
async handleUnsubscribed(event) {
return this.unsubscribeFromNewsletters(event);
}
async handleComplained(event) {
return this.unsubscribeFromNewsletters(event);
}
async unsubscribeFromNewsletters(event) {
await this.#membersRepository.update({newsletters: []}, {id: event.memberId});
}
}
module.exports = EmailEventStorage;

View File

@ -26,6 +26,8 @@
"dependencies": {
"@tryghost/errors": "1.2.18",
"@tryghost/tpl": "0.1.19",
"bson-objectid": "2.0.4"
"bson-objectid": "2.0.4",
"@tryghost/email-events": "0.0.0",
"moment-timezone": "0.5.23"
}
}

View File

@ -28,16 +28,20 @@ const ALL_STATUSES = {
};
class JobManager {
#domainEvents;
/**
* @param {Object} options
* @param {Function} [options.errorHandler] - custom job error handler
* @param {Function} [options.workerMessageHandler] - custom message handler coming from workers
* @param {Object} [options.JobModel] - a model which can persist job data in the storage
* @param {Object} [options.domainEvents] - domain events emitter
*/
constructor({errorHandler, workerMessageHandler, JobModel}) {
constructor({errorHandler, workerMessageHandler, JobModel, domainEvents}) {
this.queue = fastq(this, worker, 1);
this._jobMessageHandler = this._jobMessageHandler.bind(this);
this._jobErrorHandler = this._jobErrorHandler.bind(this);
this.#domainEvents = domainEvents;
const combinedMessageHandler = workerMessageHandler
? ({name, message}) => {
@ -109,6 +113,13 @@ class JobManager {
finished_at: new Date()
});
}
} else {
if (typeof message === 'object' && this.#domainEvents) {
// Is this an event?
if (message.event) {
this.#domainEvents.dispatchRaw(message.event.type, message.event.data);
}
}
}
}
}

View File

@ -1,6 +1,7 @@
const {MemberPageViewEvent, MemberCommentEvent, MemberLinkClickEvent} = require('@tryghost/member-events');
const moment = require('moment-timezone');
const {IncorrectUsageError} = require('@tryghost/errors');
const {EmailOpenedEvent} = require('@tryghost/email-events');
/**
* Listen for `MemberViewEvent` to update the `member.last_seen_at` timestamp
@ -42,6 +43,10 @@ class LastSeenAtUpdater {
domainEvents.subscribe(MemberCommentEvent, async (event) => {
await this.updateLastCommentedAt(event.data.memberId, event.timestamp);
});
domainEvents.subscribe(EmailOpenedEvent, async (event) => {
await this.updateLastSeenAtWithoutKnownLastSeen(event.memberId, event.timestamp);
});
}
/**
@ -50,13 +55,29 @@ class LastSeenAtUpdater {
* - memberLastSeenAt is 2022-02-27 23:00:00, timestamp is current time, then `last_seen_at` is set to the current time
* - memberLastSeenAt is 2022-02-28 01:00:00, timestamp is current time, then `last_seen_at` isn't changed
* @param {string} memberId The id of the member to be udpated
* @param {string} memberLastSeenAt The previous last_seen_at property value for the current member
* @param {Date} timestamp The event timestamp
*/
async updateLastSeenAtWithoutKnownLastSeen(memberId, timestamp) {
// Fetch manually
const membersApi = this._getMembersApi();
const member = await membersApi.members.get({id: memberId}, {require: true});
const memberLastSeenAt = member.get('last_seen_at');
await this.updateLastSeenAt(memberId, memberLastSeenAt, timestamp);
}
/**
* Updates the member.last_seen_at field if it wasn't updated in the current day yet (in the publication timezone)
* Example: current time is 2022-02-28 18:00:00
* - memberLastSeenAt is 2022-02-27 23:00:00, timestamp is current time, then `last_seen_at` is set to the current time
* - memberLastSeenAt is 2022-02-28 01:00:00, timestamp is current time, then `last_seen_at` isn't changed
* @param {string} memberId The id of the member to be udpated
* @param {string|null} memberLastSeenAt The previous last_seen_at property value for the current member
* @param {Date} timestamp The event timestamp
*/
async updateLastSeenAt(memberId, memberLastSeenAt, timestamp) {
const timezone = this._settingsCacheService.get('timezone');
if (memberLastSeenAt === null || moment(moment.utc(timestamp).tz(timezone).startOf('day')).isAfter(memberLastSeenAt)) {
const membersApi = await this._getMembersApi();
const membersApi = this._getMembersApi();
await membersApi.members.update({
last_seen_at: moment.utc(timestamp).format('YYYY-MM-DD HH:mm:ss')
}, {
@ -74,7 +95,7 @@ class LastSeenAtUpdater {
* @param {Date} timestamp The event timestamp
*/
async updateLastCommentedAt(memberId, timestamp) {
const membersApi = await this._getMembersApi();
const membersApi = this._getMembersApi();
const member = await membersApi.members.get({id: memberId}, {require: true});
const timezone = this._settingsCacheService.get('timezone');

View File

@ -8,6 +8,13 @@ const {LastSeenAtUpdater} = require('../');
const DomainEvents = require('@tryghost/domain-events');
const {MemberPageViewEvent, MemberCommentEvent} = require('@tryghost/member-events');
const moment = require('moment');
const {EmailOpenedEvent} = require('@tryghost/email-events');
async function sleep(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
describe('LastSeenAtUpdater', function () {
it('Calls updateLastSeenAt on MemberPageViewEvents', async function () {
@ -21,7 +28,7 @@ describe('LastSeenAtUpdater', function () {
get: settingsCache
}
},
async getMembersApi() {
getMembersApi() {
return {
members: {
update: stub
@ -35,6 +42,43 @@ describe('LastSeenAtUpdater', function () {
assert(updater.updateLastSeenAt.calledOnceWithExactly('1', previousLastSeen, now.toDate()));
});
it('Calls updateLastSeenAt on email opened events', async function () {
const now = moment('2022-02-28T18:00:00Z').utc();
const previousLastSeen = moment('2022-02-27T23:00:00Z').toISOString();
const stub = sinon.stub().resolves();
const getStub = sinon.stub().resolves({
get() {
return previousLastSeen;
}
});
const settingsCache = sinon.stub().returns('Etc/UTC');
const updater = new LastSeenAtUpdater({
services: {
settingsCache: {
get: settingsCache
}
},
getMembersApi() {
return {
members: {
update: stub,
get: getStub
}
};
}
});
updater.subscribe(DomainEvents);
sinon.spy(updater, 'updateLastSeenAt');
sinon.spy(updater, 'updateLastSeenAtWithoutKnownLastSeen');
DomainEvents.dispatch(EmailOpenedEvent.create({memberId: '1', emailRecipientId: '1', emailId: '1', timestamp: now.toDate()}));
// Wait for next tick
await sleep(50);
assert(updater.updateLastSeenAt.calledOnceWithExactly('1', previousLastSeen, now.toDate()));
assert(updater.updateLastSeenAtWithoutKnownLastSeen.calledOnceWithExactly('1', now.toDate()));
assert(getStub.calledOnce);
assert(stub.calledOnce);
});
it('Calls updateLastCommentedAt on MemberCommentEvents', async function () {
const now = moment('2022-02-28T18:00:00Z').utc();
const stub = sinon.stub().resolves();
@ -45,7 +89,7 @@ describe('LastSeenAtUpdater', function () {
get: settingsCache
}
},
async getMembersApi() {
getMembersApi() {
return {
members: {
update: stub
@ -70,7 +114,7 @@ describe('LastSeenAtUpdater', function () {
get: settingsCache
}
},
async getMembersApi() {
getMembersApi() {
return {
members: {
update: stub
@ -93,7 +137,7 @@ describe('LastSeenAtUpdater', function () {
get: settingsCache
}
},
async getMembersApi() {
getMembersApi() {
return {
members: {
update: stub,
@ -124,7 +168,7 @@ describe('LastSeenAtUpdater', function () {
get: settingsCache
}
},
async getMembersApi() {
getMembersApi() {
return {
members: {
update: stub
@ -151,7 +195,7 @@ describe('LastSeenAtUpdater', function () {
get: settingsCache
}
},
async getMembersApi() {
getMembersApi() {
return {
members: {
update: stub
@ -174,7 +218,7 @@ describe('LastSeenAtUpdater', function () {
get: settingsCache
}
},
async getMembersApi() {
getMembersApi() {
return {
members: {
update: stub,
@ -205,7 +249,7 @@ describe('LastSeenAtUpdater', function () {
get: settingsCache
}
},
async getMembersApi() {
getMembersApi() {
return {
members: {
update: stub,
@ -236,7 +280,7 @@ describe('LastSeenAtUpdater', function () {
get: settingsCache
}
},
async getMembersApi() {
getMembersApi() {
return {
members: {
update: stub,
@ -274,7 +318,7 @@ describe('LastSeenAtUpdater', function () {
get: settingsCache
}
},
async getMembersApi() {
getMembersApi() {
return {
members: {
update: stub,
@ -311,7 +355,7 @@ describe('LastSeenAtUpdater', function () {
get: settingsCache
}
},
async getMembersApi() {
getMembersApi() {
return {
members: {
update: stub