Added storage for email failures (#15901)

fixes https://github.com/TryGhost/Team/issues/2332

Saves events in the database and collects error information.

Do note that we can emit the same events multiple times, and as a result
out of order. That means we should correctly handle that a delivered
event might be fired after a permanent failure. So a delivered event is
ignored if the email is already marked as failed. Also delivered_at is
reset to null when we receive a permanent failure.
This commit is contained in:
Simon Backx 2022-12-01 10:00:53 +01:00 committed by GitHub
parent a1ee04b08f
commit d8187123af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 892 additions and 37 deletions

View File

@ -0,0 +1,29 @@
const ghostBookshelf = require('./base');
const EmailRecipientFailure = ghostBookshelf.Model.extend({
tableName: 'email_recipient_failures',
hasTimestamps: false,
defaults() {
return {
};
},
email() {
return this.belongsTo('Email', 'email_id');
},
member() {
return this.belongsTo('Member', 'member_id');
},
emailRecipient() {
return this.belongsTo('EmailRecipient', 'email_recipient_id');
}
}, {
});
module.exports = {
EmailRecipientFailure: ghostBookshelf.model('EmailRecipientFailure', EmailRecipientFailure)
};

View File

@ -14,7 +14,7 @@ class EmailServiceWrapper {
}
const {EmailService, EmailController, EmailRenderer, SendingService, BatchSendingService, EmailSegmenter, EmailEventStorage, MailgunEmailProvider} = require('@tryghost/email-service');
const {Post, Newsletter, Email, EmailBatch, EmailRecipient, Member} = require('../../models');
const {Post, Newsletter, Email, EmailBatch, EmailRecipient, Member, EmailRecipientFailure} = require('../../models');
const MailgunClient = require('@tryghost/mailgun-client');
const configService = require('../../../shared/config');
const settingsCache = require('../../../shared/settings-cache');
@ -112,7 +112,10 @@ class EmailServiceWrapper {
this.eventStorage = new EmailEventStorage({
db,
membersRepository
membersRepository,
models: {
EmailRecipientFailure
}
});
this.eventStorage.listen(domainEvents);
}

View File

@ -6,6 +6,7 @@ const domainEvents = require('@tryghost/domain-events');
const MailgunClient = require('@tryghost/mailgun-client');
const {run} = require('../../../../core/server/services/email-analytics/jobs/fetch-latest/run.js');
const membersService = require('../../../../core/server/services/members');
const {EmailDeliveredEvent} = require('@tryghost/email-events');
async function sleep(ms) {
return new Promise((resolve) => {
@ -13,6 +14,14 @@ async function sleep(ms) {
});
}
async function resetFailures(emailId) {
await models.EmailRecipientFailure.destroy({
destroyBy: {
email_id: emailId
}
});
}
// Test the whole E2E flow from Mailgun events -> handling and storage
describe('EmailEventStorage', function () {
let _mailgunClient;
@ -204,17 +213,47 @@ describe('EmailEventStorage', function () {
events = [{
event: 'failed',
id: 'pl271FzxTTmGRW8Uj3dUWw',
'log-level': 'error',
severity: 'permanent',
recipient: emailRecipient.member_email,
'user-variables': {
'email-id': emailId
reason: 'suppress-bounce',
envelope: {
sender: 'john@example.org',
transport: 'smtp',
targets: 'joan@example.com'
},
flags: {
'is-routed': false,
'is-authenticated': true,
'is-system-test': false,
'is-test-mode': false
},
'delivery-status': {
'attempt-no': 1,
message: '',
code: 605,
description: 'Not delivering to previously bounced address',
'session-seconds': 0.0
},
message: {
headers: {
'message-id': providerId
}
to: 'joan@example.com',
'message-id': providerId,
from: 'john@example.org',
subject: 'Test Subject'
},
attachments: [],
size: 867
},
// unix timestamp
storage: {
url: 'https://se.api.mailgun.net/v3/domains/example.org/messages/eyJwI...',
key: 'eyJwI...'
},
recipient: emailRecipient.member_email,
'recipient-domain': 'mailgun.com',
campaigns: [],
tags: [],
'user-variables': {},
timestamp: Math.round(timestamp.getTime() / 1000)
}];
@ -223,6 +262,7 @@ describe('EmailEventStorage', function () {
}, {require: true});
assert.equal(initialModel.get('failed_at'), null);
assert.notEqual(initialModel.get('delivered_at'), null);
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
@ -242,9 +282,147 @@ describe('EmailEventStorage', function () {
}, {require: true});
assert.equal(updatedEmailRecipient.get('failed_at').toUTCString(), timestamp.toUTCString());
// Check delivered at is reset back to null
assert.equal(updatedEmailRecipient.get('delivered_at'), null);
// Check we have a stored permanent failure
const permanentFailures = await models.EmailRecipientFailure.findAll({
filter: `email_recipient_id:${emailRecipient.id}`
});
assert.equal(permanentFailures.length, 1);
assert.equal(permanentFailures.models[0].get('message'), 'Not delivering to previously bounced address');
assert.equal(permanentFailures.models[0].get('code'), 605);
assert.equal(permanentFailures.models[0].get('enhanced_code'), null);
assert.equal(permanentFailures.models[0].get('email_id'), emailId);
assert.equal(permanentFailures.models[0].get('member_id'), memberId);
assert.equal(permanentFailures.models[0].get('event_id'), 'pl271FzxTTmGRW8Uj3dUWw');
assert.equal(permanentFailures.models[0].get('severity'), 'permanent');
assert.equal(permanentFailures.models[0].get('failed_at').toUTCString(), timestamp.toUTCString());
// Sometimes we emit events outside of order beacuse of the TRUST_THRESHOLD of the provider-mailgun class.
// Check if we handle this correctly.
// Manually emit the delivered event again, and see if it is ignored correctly
// @ts-ignore
domainEvents.dispatch(EmailDeliveredEvent.create({
email: emailRecipient.member_email,
emailRecipientId: emailRecipient.id,
memberId: memberId,
emailId: emailId,
timestamp
}));
// Now wait for events processed
await sleep(200);
// Check delivered at is not set again
const updatedEmailRecipient2 = await models.EmailRecipient.findOne({
id: emailRecipient.id
}, {require: true});
assert.equal(updatedEmailRecipient2.get('failed_at').toUTCString(), timestamp.toUTCString());
// Check delivered at is reset back to null
assert.equal(updatedEmailRecipient2.get('delivered_at'), null, 'A delivered event after a permanent failure event should be ignored');
});
it('Can handle tempoary failure events', async function () {
it('Ignores permanent failures if already failed', async function () {
const emailBatch = fixtureManager.get('email_batches', 0);
const emailId = emailBatch.email_id;
const emailRecipient = fixtureManager.get('email_recipients', 0);
assert(emailRecipient.batch_id === emailBatch.id);
const memberId = emailRecipient.member_id;
const providerId = emailBatch.provider_id;
const timestamp = new Date(2001, 0, 1);
events = [{
event: 'failed',
id: 'pl271FzxTTmGRW8Uj3dUWw2',
'log-level': 'error',
severity: 'permanent',
reason: 'suppress-bounce',
envelope: {
sender: 'john@example.org',
transport: 'smtp',
targets: 'joan@example.com'
},
flags: {
'is-routed': false,
'is-authenticated': true,
'is-system-test': false,
'is-test-mode': false
},
'delivery-status': {
'attempt-no': 1,
message: '',
code: 500,
description: 'Different message',
'session-seconds': 0.0
},
message: {
headers: {
to: 'joan@example.com',
'message-id': providerId,
from: 'john@example.org',
subject: 'Test Subject'
},
attachments: [],
size: 867
},
storage: {
url: 'https://se.api.mailgun.net/v3/domains/example.org/messages/eyJwI...',
key: 'eyJwI...'
},
recipient: emailRecipient.member_email,
'recipient-domain': 'mailgun.com',
campaigns: [],
tags: [],
'user-variables': {},
timestamp: Math.round(timestamp.getTime() / 1000)
}];
const initialModel = await models.EmailRecipient.findOne({
id: emailRecipient.id
}, {require: true});
assert.notEqual(initialModel.get('failed_at'), null, 'This test requires a failed email recipient');
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
assert.equal(result.permanentFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
// Now wait for events processed
await sleep(200);
// Check if status has changed to delivered, with correct timestamp
const updatedEmailRecipient = await models.EmailRecipient.findOne({
id: emailRecipient.id
}, {require: true});
// Not changed failed_at
assert.equal(updatedEmailRecipient.get('failed_at').toUTCString(), initialModel.get('failed_at').toUTCString());
// Check we have a stored permanent failure
const permanentFailures = await models.EmailRecipientFailure.findAll({
filter: `email_recipient_id:${emailRecipient.id}`
});
assert.equal(permanentFailures.length, 1);
// Message and code not changed
assert.equal(permanentFailures.models[0].get('message'), 'Not delivering to previously bounced address');
assert.equal(permanentFailures.models[0].get('code'), 605);
assert.equal(permanentFailures.models[0].get('enhanded_code'), null);
assert.notEqual(permanentFailures.models[0].get('failed_at').toUTCString(), timestamp.toUTCString());
});
it('Can handle temporary failure events', async function () {
const emailBatch = fixtureManager.get('email_batches', 0);
const emailId = emailBatch.email_id;
@ -258,6 +436,7 @@ describe('EmailEventStorage', function () {
await models.EmailRecipient.edit({failed_at: null}, {
id: emailRecipient.id
});
await resetFailures(emailId);
events = [{
event: 'failed',
@ -266,13 +445,61 @@ describe('EmailEventStorage', function () {
'user-variables': {
'email-id': emailId
},
// unix timestamp
timestamp: Math.round(timestamp.getTime() / 1000),
tags: [],
storage: {
url: 'https://storage-us-east4.api.mailgun.net/v3/domains/...',
region: 'us-east4',
key: 'AwABB...',
env: 'production'
},
'delivery-status': {
tls: true,
'mx-host': 'hotmail-com.olc.protection.outlook.com',
code: 451,
description: '',
'session-seconds': 0.7517080307006836,
utf8: true,
'retry-seconds': 600,
'enhanced-code': '4.7.652',
'attempt-no': 1,
message: '4.7.652 The mail server [xxx.xxx.xxx.xxx] has exceeded the maximum number of connections.',
'certificate-verified': true
},
batch: {
id: '633ee6154618b2fed628ccb0'
},
'recipient-domain': 'test.com',
id: 'xYrATi63Rke8EC_s7EoJeA',
campaigns: [],
reason: 'generic',
flags: {
'is-routed': false,
'is-authenticated': true,
'is-system-test': false,
'is-test-mode': false
},
'log-level': 'warn',
template: {
name: 'test'
},
envelope: {
transport: 'smtp',
sender: 'test@test.com',
'sending-ip': 'xxx.xxx.xxx.xxx',
targets: 'test@test.com'
},
message: {
headers: {
'message-id': providerId
}
},
// unix timestamp
timestamp: Math.round(timestamp.getTime() / 1000)
to: 'test@test.net',
'message-id': providerId,
from: 'test@test.com',
subject: 'Test send'
},
attachments: [],
size: 3499
}
}];
const initialModel = await models.EmailRecipient.findOne({
@ -299,7 +526,236 @@ describe('EmailEventStorage', function () {
}, {require: true});
// Not mark as failed
assert.equal(initialModel.get('failed_at'), null);
assert.equal(updatedEmailRecipient.get('failed_at'), null);
// Check we have a stored temporary failure
const failures = await models.EmailRecipientFailure.findAll({
filter: `email_recipient_id:${emailRecipient.id}`
});
assert.equal(failures.length, 1);
assert.equal(failures.models[0].get('email_id'), emailId);
assert.equal(failures.models[0].get('member_id'), memberId);
assert.equal(failures.models[0].get('severity'), 'temporary');
assert.equal(failures.models[0].get('event_id'), 'xYrATi63Rke8EC_s7EoJeA');
assert.equal(failures.models[0].get('message'), '4.7.652 The mail server [xxx.xxx.xxx.xxx] has exceeded the maximum number of connections.');
assert.equal(failures.models[0].get('code'), 451);
assert.equal(failures.models[0].get('enhanced_code'), '4.7.652');
assert.equal(failures.models[0].get('failed_at').toUTCString(), timestamp.toUTCString());
});
it('Correctly overwrites temporary failure event with other temporary one', async function () {
const emailBatch = fixtureManager.get('email_batches', 0);
const emailId = emailBatch.email_id;
const emailRecipient = fixtureManager.get('email_recipients', 0);
assert(emailRecipient.batch_id === emailBatch.id);
const memberId = emailRecipient.member_id;
const providerId = emailBatch.provider_id;
const timestamp = new Date(2001, 0, 1);
events = [{
event: 'failed',
severity: 'temporary',
recipient: emailRecipient.member_email,
'user-variables': {
'email-id': emailId
},
// unix timestamp
timestamp: Math.round(timestamp.getTime() / 1000),
tags: [],
storage: {
url: 'https://storage-us-east4.api.mailgun.net/v3/domains/...',
region: 'us-east4',
key: 'AwABB...',
env: 'production'
},
'delivery-status': {
tls: true,
code: 555,
description: '',
utf8: true,
'retry-seconds': 600,
'attempt-no': 1,
message: 'New error message failure',
'certificate-verified': true
},
batch: {
id: '633ee6154618b2fed628ccb0'
},
'recipient-domain': 'test.com',
id: 'updated_event_id',
campaigns: [],
reason: 'generic',
flags: {
'is-routed': false,
'is-authenticated': true,
'is-system-test': false,
'is-test-mode': false
},
'log-level': 'warn',
template: {
name: 'test'
},
envelope: {
transport: 'smtp',
sender: 'test@test.com',
'sending-ip': 'xxx.xxx.xxx.xxx',
targets: 'test@test.com'
},
message: {
headers: {
to: 'test@test.net',
'message-id': providerId,
from: 'test@test.com',
subject: 'Test send'
},
attachments: [],
size: 3499
}
}];
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
assert.equal(result.temporaryFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
// Now wait for events processed
await sleep(200);
// Check if status has changed to delivered, with correct timestamp
const updatedEmailRecipient = await models.EmailRecipient.findOne({
id: emailRecipient.id
}, {require: true});
// Not mark as failed
assert.equal(updatedEmailRecipient.get('failed_at'), null);
// Check we have a stored temporary failure
const failures = await models.EmailRecipientFailure.findAll({
filter: `email_recipient_id:${emailRecipient.id}`
});
assert.equal(failures.length, 1);
assert.equal(failures.models[0].get('email_id'), emailId);
assert.equal(failures.models[0].get('member_id'), memberId);
assert.equal(failures.models[0].get('severity'), 'temporary');
assert.equal(failures.models[0].get('event_id'), 'updated_event_id');
assert.equal(failures.models[0].get('message'), 'New error message failure');
assert.equal(failures.models[0].get('code'), 555);
assert.equal(failures.models[0].get('enhanced_code'), null); // should be set to null instead of kept
assert.equal(failures.models[0].get('failed_at').toUTCString(), timestamp.toUTCString());
});
it('Correctly overwrites permanent failure event with other permanent one', async function () {
const emailBatch = fixtureManager.get('email_batches', 0);
const emailId = emailBatch.email_id;
const emailRecipient = fixtureManager.get('email_recipients', 0);
assert(emailRecipient.batch_id === emailBatch.id);
const memberId = emailRecipient.member_id;
const providerId = emailBatch.provider_id;
const timestamp = new Date(2001, 0, 1);
events = [{
event: 'failed',
severity: 'permanent',
recipient: emailRecipient.member_email,
'user-variables': {
'email-id': emailId
},
// unix timestamp
timestamp: Math.round(timestamp.getTime() / 1000),
tags: [],
storage: {
url: 'https://storage-us-east4.api.mailgun.net/v3/domains/...',
region: 'us-east4',
key: 'AwABB...',
env: 'production'
},
'delivery-status': {
tls: true,
code: 111,
description: '',
utf8: true,
'retry-seconds': 600,
'attempt-no': 1,
message: 'New error message permanent failure',
'certificate-verified': true
},
batch: {
id: '633ee6154618b2fed628ccb0'
},
'recipient-domain': 'test.com',
id: 'updated_permanent_event_id',
campaigns: [],
reason: 'generic',
flags: {
'is-routed': false,
'is-authenticated': true,
'is-system-test': false,
'is-test-mode': false
},
'log-level': 'warn',
template: {
name: 'test'
},
envelope: {
transport: 'smtp',
sender: 'test@test.com',
'sending-ip': 'xxx.xxx.xxx.xxx',
targets: 'test@test.com'
},
message: {
headers: {
to: 'test@test.net',
'message-id': providerId,
from: 'test@test.com',
subject: 'Test send'
},
attachments: [],
size: 3499
}
}];
// Fire event processing
// We use offloading to have correct coverage and usage of worker thread
const {eventStats: result} = await run({
domainEvents
});
assert.equal(result.permanentFailed, 1);
assert.deepEqual(result.emailIds, [emailId]);
assert.deepEqual(result.memberIds, [memberId]);
// Now wait for events processed
await sleep(200);
// Check if status has changed to delivered, with correct timestamp
const updatedEmailRecipient = await models.EmailRecipient.findOne({
id: emailRecipient.id
}, {require: true});
// Not mark as failed
assert.equal(updatedEmailRecipient.get('failed_at').toUTCString(), timestamp.toUTCString());
// Check we have a stored temporary failure
const failures = await models.EmailRecipientFailure.findAll({
filter: `email_recipient_id:${emailRecipient.id}`
});
assert.equal(failures.length, 1);
assert.equal(failures.models[0].get('email_id'), emailId);
assert.equal(failures.models[0].get('member_id'), memberId);
assert.equal(failures.models[0].get('severity'), 'permanent');
assert.equal(failures.models[0].get('event_id'), 'updated_permanent_event_id');
assert.equal(failures.models[0].get('message'), 'New error message permanent failure');
assert.equal(failures.models[0].get('code'), 111);
assert.equal(failures.models[0].get('enhanced_code'), null); // should be set to null instead of kept
assert.equal(failures.models[0].get('failed_at').toUTCString(), timestamp.toUTCString());
});
it('Can handle complaint events', async function () {

View File

@ -84,7 +84,7 @@ module.exports = class EmailAnalyticsService {
/**
*
* @param {{type: any; severity: any; recipientEmail: any; emailId: any; providerId: string; timestamp: Date;}} event
* @param {{id: string, type: any; severity: any; recipientEmail: any; emailId: any; providerId: string; timestamp: Date; error: {code: number; message: string; enhandedCode: string|number} | null}} event
* @returns {Promise<EventProcessingResult>}
*/
async processEvent(event) {
@ -118,7 +118,7 @@ module.exports = class EmailAnalyticsService {
if (event.type === 'failed') {
if (event.severity === 'permanent') {
const recipient = await this.eventProcessor.handlePermanentFailed({emailId: event.emailId, providerId: event.providerId, email: event.recipientEmail}, event.timestamp);
const recipient = await this.eventProcessor.handlePermanentFailed({emailId: event.emailId, providerId: event.providerId, email: event.recipientEmail}, {id: event.id, timestamp: event.timestamp, error: event.error});
if (recipient) {
return new EventProcessingResult({
@ -130,7 +130,7 @@ module.exports = class EmailAnalyticsService {
return new EventProcessingResult({unprocessable: 1});
} else {
const recipient = await this.eventProcessor.handleTemporaryFailed({emailId: event.emailId, providerId: event.providerId, email: event.recipientEmail}, event.timestamp);
const recipient = await this.eventProcessor.handleTemporaryFailed({emailId: event.emailId, providerId: event.providerId, email: event.recipientEmail}, {id: event.id, timestamp: event.timestamp, error: event.error});
if (recipient) {
return new EventProcessingResult({

View File

@ -3,5 +3,6 @@ module.exports = {
EmailBouncedEvent: require('./lib/EmailBouncedEvent'),
EmailDeliveredEvent: require('./lib/EmailDeliveredEvent'),
EmailOpenedEvent: require('./lib/EmailOpenedEvent'),
EmailUnsubscribedEvent: require('./lib/EmailUnsubscribedEvent')
EmailUnsubscribedEvent: require('./lib/EmailUnsubscribedEvent'),
EmailTemporaryBouncedEvent: require('./lib/EmailTemporaryBouncedEvent')
};

View File

@ -1,4 +1,10 @@
module.exports = class EmailBouncedEvent {
/**
* @readonly
* @type {string}
*/
id;
/**
* @readonly
* @type {string}
@ -17,6 +23,12 @@ module.exports = class EmailBouncedEvent {
*/
emailId;
/**
* @readonly
* @type {{message: string, code: number, enhancedCode: string | null}}
*/
error;
/**
* @readonly
* @type {string}
@ -32,10 +44,12 @@ module.exports = class EmailBouncedEvent {
/**
* @private
*/
constructor({email, memberId, emailId, emailRecipientId, timestamp}) {
constructor({id, email, memberId, emailId, error, emailRecipientId, timestamp}) {
this.id = id;
this.memberId = memberId;
this.emailId = emailId;
this.email = email;
this.error = error;
this.emailRecipientId = emailRecipientId;
this.timestamp = timestamp;
}

View File

@ -0,0 +1,63 @@
module.exports = class EmailTemporaryBouncedEvent {
/**
* @readonly
* @type {string}
*/
id;
/**
* @readonly
* @type {string}
*/
email;
/**
* @readonly
* @type {string}
*/
memberId;
/**
* @readonly
* @type {string}
*/
emailId;
/**
* @readonly
* @type {{message: string, code: number, enhancedCode: string | null}}
*/
error;
/**
* @readonly
* @type {string}
*/
emailRecipientId;
/**
* @readonly
* @type {Date}
*/
timestamp;
/**
* @private
*/
constructor({id, email, memberId, emailId, emailRecipientId, error, timestamp}) {
this.id = id;
this.memberId = memberId;
this.emailId = emailId;
this.email = email;
this.error = error;
this.emailRecipientId = emailRecipientId;
this.timestamp = timestamp;
}
static create(data) {
return new EmailTemporaryBouncedEvent({
...data,
timestamp: data.timestamp || new Date
});
}
};

View File

@ -5,11 +5,17 @@ const EmailBouncedEvent = require('../../lib/EmailBouncedEvent');
describe('EmailBouncedEvent', function () {
it('exports a static create method to create instances', function () {
const event = EmailBouncedEvent.create({
id: 'id',
email: 'test@test.test',
memberId: new ObjectID().toHexString(),
emailId: new ObjectID().toHexString(),
emailRecipientId: new ObjectID().toHexString(),
timestamp: new Date()
timestamp: new Date(),
error: {
message: 'test',
code: 1,
enhancedCode: '1.1'
}
});
assert(event instanceof EmailBouncedEvent);
});

View File

@ -0,0 +1,22 @@
const assert = require('assert');
const ObjectID = require('bson-objectid').default;
const EmailTemporaryBouncedEvent = require('../../lib/EmailTemporaryBouncedEvent');
describe('EmailTemporaryBouncedEvent', function () {
it('exports a static create method to create instances', function () {
const event = EmailTemporaryBouncedEvent.create({
id: 'id',
email: 'test@test.test',
memberId: new ObjectID().toHexString(),
emailId: new ObjectID().toHexString(),
emailRecipientId: new ObjectID().toHexString(),
timestamp: new Date(),
error: {
message: 'test',
code: 1,
enhancedCode: '1.1'
}
});
assert(event instanceof EmailTemporaryBouncedEvent);
});
});

View File

@ -1,4 +1,4 @@
const {EmailDeliveredEvent, EmailOpenedEvent, EmailBouncedEvent, SpamComplaintEvent, EmailUnsubscribedEvent} = require('@tryghost/email-events');
const {EmailDeliveredEvent, EmailOpenedEvent, EmailBouncedEvent, SpamComplaintEvent, EmailUnsubscribedEvent, EmailTemporaryBouncedEvent} = require('@tryghost/email-events');
/**
* @typedef EmailIdentification
@ -67,22 +67,34 @@ class EmailEventProcessor {
/**
* @param {EmailIdentification} emailIdentification
* @param {{id: string, timestamp: Date, error: {code: number; message: string; enhandedCode: string|number} | null}} event
*/
async handleTemporaryFailed(emailIdentification) {
async handleTemporaryFailed(emailIdentification, {timestamp, error, id}) {
const recipient = await this.getRecipient(emailIdentification);
// TODO: store and emit event
if (recipient) {
this.#domainEvents.dispatch(EmailTemporaryBouncedEvent.create({
id,
error,
email: emailIdentification.email,
memberId: recipient.memberId,
emailId: recipient.emailId,
emailRecipientId: recipient.emailRecipientId,
timestamp
}));
}
return recipient;
}
/**
* @param {EmailIdentification} emailIdentification
* @param {Date} timestamp
* @param {{id: string, timestamp: Date, error: {code: number; message: string; enhandedCode: string|number} | null}} event
*/
async handlePermanentFailed(emailIdentification, timestamp) {
// TODO: also read error message
async handlePermanentFailed(emailIdentification, {timestamp, error, id}) {
const recipient = await this.getRecipient(emailIdentification);
if (recipient) {
this.#domainEvents.dispatch(EmailBouncedEvent.create({
id,
error,
email: emailIdentification.email,
memberId: recipient.memberId,
emailId: recipient.emailId,

View File

@ -1,40 +1,75 @@
const {EmailDeliveredEvent, EmailOpenedEvent, EmailBouncedEvent, EmailUnsubscribedEvent, SpamComplaintEvent} = require('@tryghost/email-events');
const {EmailDeliveredEvent, EmailOpenedEvent, EmailBouncedEvent, EmailTemporaryBouncedEvent, EmailUnsubscribedEvent, SpamComplaintEvent} = require('@tryghost/email-events');
const moment = require('moment-timezone');
const logging = require('@tryghost/logging');
class EmailEventStorage {
#db;
#membersRepository;
#models;
constructor({db, membersRepository}) {
constructor({db, models, membersRepository}) {
this.#db = db;
this.#models = models;
this.#membersRepository = membersRepository;
}
listen(domainEvents) {
domainEvents.subscribe(EmailDeliveredEvent, async (event) => {
await this.handleDelivered(event);
try {
await this.handleDelivered(event);
} catch (err) {
logging.error(err);
}
});
domainEvents.subscribe(EmailOpenedEvent, async (event) => {
await this.handleOpened(event);
try {
await this.handleOpened(event);
} catch (err) {
logging.error(err);
}
});
domainEvents.subscribe(EmailBouncedEvent, async (event) => {
await this.handlePermanentFailed(event);
try {
await this.handlePermanentFailed(event);
} catch (e) {
logging.error(e);
}
});
domainEvents.subscribe(EmailTemporaryBouncedEvent, async (event) => {
try {
await this.handleTemporaryFailed(event);
} catch (e) {
logging.error(e);
}
});
domainEvents.subscribe(EmailUnsubscribedEvent, async (event) => {
await this.handleUnsubscribed(event);
try {
await this.handleUnsubscribed(event);
} catch (e) {
logging.error(e);
}
});
domainEvents.subscribe(SpamComplaintEvent, async (event) => {
await this.handleComplained(event);
try {
await this.handleComplained(event);
} catch (e) {
logging.error(e);
}
});
}
async handleDelivered(event) {
// To properly handle events that are received out of order (this happens because of polling)
// we only can set an email recipient to delivered if they are not already marked as failed
// Why handle this her? An email can be 'delivered' and later have a delayed bounce event. So we need to prevent that delivered_at is set again.
await this.#db.knex('email_recipients')
.where('id', '=', event.emailRecipientId)
.whereNull('failed_at')
.update({
delivered_at: this.#db.knex.raw('COALESCE(delivered_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')])
});
@ -52,8 +87,74 @@ class EmailEventStorage {
await this.#db.knex('email_recipients')
.where('id', '=', event.emailRecipientId)
.update({
failed_at: this.#db.knex.raw('COALESCE(failed_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')])
failed_at: this.#db.knex.raw('COALESCE(failed_at, ?)', [moment.utc(event.timestamp).format('YYYY-MM-DD HH:mm:ss')]),
delivered_at: null // Reset in case we have a delayed bounce event
});
await this.saveFailure('permanent', event);
}
async handleTemporaryFailed(event) {
await this.saveFailure('temporary', event);
}
/**
* @private
* @param {'temporary'|'permanent'} severity
* @param {import('@tryghost/email-events').EmailTemporaryBouncedEvent|import('@tryghost/email-events').EmailBouncedEvent} event
* @param {{transacting?: any}} options
* @returns
*/
async saveFailure(severity, event, options = {}) {
if (!event.error) {
logging.warn(`Missing error information provided for ${severity} failure event with id ${event.id}`);
return;
}
if (!options || !options.transacting) {
return await this.#models.EmailRecipientFailure.transaction(async (transacting) => {
await this.saveFailure(severity, event, {transacting});
});
}
// Create a forUpdate transaction
const existing = await this.#models.EmailRecipientFailure.findOne({
filter: `email_recipient_id:${event.emailRecipientId}`
}, {...options, require: false, forUpdate: true});
if (!existing) {
// Create a new failure
await this.#models.EmailRecipientFailure.add({
email_id: event.emailId,
member_id: event.memberId,
email_recipient_id: event.emailRecipientId,
severity,
message: event.error.message,
code: event.error.code,
enhanced_code: event.error.enhancedCode,
failed_at: event.timestamp,
event_id: event.id
}, options);
} else {
if (existing.get('severity') === 'permanent') {
// Already marked as failed, no need to change anything here
return;
}
if (existing.get('failed_at') > event.timestamp) {
/// We can get events out of order, so only save the last one
return;
}
// Update the existing failure
await existing.save({
severity,
message: event.error.message,
code: event.error.code,
enhanced_code: event.error.enhancedCode ?? null,
failed_at: event.timestamp,
event_id: event.id
}, {...options, patch: true});
}
}
async handleUnsubscribed(event) {

View File

@ -172,12 +172,19 @@ module.exports = class MailgunClient {
const providerId = event?.message?.headers['message-id'];
return {
id: event.id,
type: event.event,
severity: event.severity,
recipientEmail: event.recipient,
emailId: event['user-variables'] && event['user-variables']['email-id'],
providerId: providerId,
timestamp: new Date(event.timestamp * 1000)
timestamp: new Date(event.timestamp * 1000),
error: event['delivery-status'] && (typeof (event['delivery-status'].message || event['delivery-status'].description) === 'string') ? {
code: event['delivery-status'].code,
message: (event['delivery-status'].message || event['delivery-status'].description).substring(0, 2000),
enhancedCode: event['delivery-status']['enhanced-code']?.toString()?.substring(0, 50) ?? null
} : null
};
}

View File

@ -277,6 +277,7 @@ describe('MailgunClient', function () {
describe('normalizeEvent()', function () {
it('works', function () {
const event = {
id: 'pl271FzxTTmGRW8Uj3dUWw',
event: 'testEvent',
severity: 'testSeverity',
recipient: 'testRecipient',
@ -300,7 +301,147 @@ describe('MailgunClient', function () {
recipientEmail: 'testRecipient',
emailId: 'testEmailId',
providerId: 'testProviderId',
timestamp: new Date('2021-02-25T17:54:22.000Z')
timestamp: new Date('2021-02-25T17:54:22.000Z'),
error: null,
id: 'pl271FzxTTmGRW8Uj3dUWw'
});
});
it('works for errors', function () {
const event = {
event: 'failed',
id: 'pl271FzxTTmGRW8Uj3dUWw',
'log-level': 'error',
severity: 'permanent',
reason: 'suppress-bounce',
envelope: {
sender: 'john@example.org',
transport: 'smtp',
targets: 'joan@example.com'
},
flags: {
'is-routed': false,
'is-authenticated': true,
'is-system-test': false,
'is-test-mode': false
},
'delivery-status': {
'attempt-no': 1,
message: '',
code: 605,
description: 'Not delivering to previously bounced address',
'session-seconds': 0.0
},
message: {
headers: {
to: 'joan@example.com',
'message-id': 'testProviderId',
from: 'john@example.org',
subject: 'Test Subject'
},
attachments: [],
size: 867
},
storage: {
url: 'https://se.api.mailgun.net/v3/domains/example.org/messages/eyJwI...',
key: 'eyJwI...'
},
recipient: 'testRecipient',
'recipient-domain': 'mailgun.com',
campaigns: [],
tags: [],
'user-variables': {},
timestamp: 1614275662
};
const mailgunClient = new MailgunClient({config, settings});
const result = mailgunClient.normalizeEvent(event);
assert.deepStrictEqual(result, {
type: 'failed',
severity: 'permanent',
recipientEmail: 'testRecipient',
emailId: undefined,
providerId: 'testProviderId',
timestamp: new Date('2021-02-25T17:54:22.000Z'),
error: {
code: 605,
enhancedCode: null,
message: 'Not delivering to previously bounced address'
},
id: 'pl271FzxTTmGRW8Uj3dUWw'
});
});
it('works for enhanced errors', function () {
const event = {
event: 'failed',
id: 'pl271FzxTTmGRW8Uj3dUWw',
'log-level': 'error',
severity: 'permanent',
reason: 'suppress-bounce',
envelope: {
sender: 'john@example.org',
transport: 'smtp',
targets: 'joan@example.com'
},
flags: {
'is-routed': false,
'is-authenticated': true,
'is-system-test': false,
'is-test-mode': false
},
'delivery-status': {
tls: true,
'mx-host': 'hotmail-com.olc.protection.outlook.com',
code: 451,
description: '',
'session-seconds': 0.7517080307006836,
utf8: true,
'retry-seconds': 600,
'enhanced-code': '4.7.652',
'attempt-no': 1,
message: '4.7.652 The mail server [xxx.xxx.xxx.xxx] has exceeded the maximum number of connections.',
'certificate-verified': true
},
message: {
headers: {
to: 'joan@example.com',
'message-id': 'testProviderId',
from: 'john@example.org',
subject: 'Test Subject'
},
attachments: [],
size: 867
},
storage: {
url: 'https://se.api.mailgun.net/v3/domains/example.org/messages/eyJwI...',
key: 'eyJwI...'
},
recipient: 'testRecipient',
'recipient-domain': 'mailgun.com',
campaigns: [],
tags: [],
'user-variables': {},
timestamp: 1614275662
};
const mailgunClient = new MailgunClient({config, settings});
const result = mailgunClient.normalizeEvent(event);
assert.deepStrictEqual(result, {
type: 'failed',
severity: 'permanent',
recipientEmail: 'testRecipient',
emailId: undefined,
providerId: 'testProviderId',
timestamp: new Date('2021-02-25T17:54:22.000Z'),
error: {
code: 451,
enhancedCode: '4.7.652',
message: '4.7.652 The mail server [xxx.xxx.xxx.xxx] has exceeded the maximum number of connections.'
},
id: 'pl271FzxTTmGRW8Uj3dUWw'
});
});
});