Ghost/ghost/email-service/lib/email-event-processor.js
Simon Backx 923c522778
Implemented email analytics retrying (#16273)
fixes https://github.com/TryGhost/Team/issues/2562

New event fetching loops:
- Reworked the analytics fetching algorithm. Instead of starting again
where we stopped during the last fetching minus 30 minutes, we now just
continue where we stopped. But with ms precision (because no longer
database dependent after first fetch), and we stop at NOW - 1 minute to
reduce chance of missing events.
- Apart from that, a missing fetching loop is introduced. This fetches
events that are older than 30 minutes, and just processes all events a
second time to make sure we didn't skip any because of storage delays in
the Mailgun API.
- A new scheduled fetching loop, that allows us to schedule between a
given start/end date (currently only persisted in memory, so stops after
a reboot)

UI and endpoint changes:
- New UI to show the state of the analytics 'loops'
- New endpoint to request the analytics loop status
- New endpoint to schedule analytics
- New endpoint to cancel scheduled analytics
- Some number formatting improvements, and introduction of 'opened'
count in debug screen
- Live reload of data in the debug screen

Other changes:
- This also improves the support for maxEvents. We can now stop a
fetching loop after x events without worrying about lost events. This is
used to reduce the fetched events in the missing and scheduled event
loop (e.g. when the main one is fetching lots of events, we skip the
other loops).
- Prevents fetching the same events over and over again if no new events
come in (because we always started at the same begin timestamp). The
code increases the begin timestamp with 1 second if it is safe to do so,
to prevent the API from returning the same events over and over again.
- Some optimisations in handing the processing results (less merges to
reduce CPU usage in cases we have lots of events).

Testing:
- You can test with lots of events using the new mailgun mocking server
(Toolbox repo `scripts/mailgun-mock-server`). This can also simulate
events that are only returned after x minutes because of storage delays.
2023-02-20 16:44:13 +01:00

237 lines
7.8 KiB
JavaScript

const {EmailDeliveredEvent, EmailOpenedEvent, EmailBouncedEvent, SpamComplaintEvent, EmailUnsubscribedEvent, EmailTemporaryBouncedEvent} = require('@tryghost/email-events');
async function waitForEvent() {
return new Promise((resolve) => {
setTimeout(resolve, 70);
});
}
/**
* @typedef EmailIdentification
* @property {string} email
* @property {string} providerId
* @property {string} [emailId] Optional email id
*/
/**
* @typedef EmailRecipientInformation
* @property {string} emailRecipientId
* @property {string} memberId
* @property {string} emailId
*/
/**
* @typedef EmailEventStorage
* @property {(event: EmailDeliveredEvent) => Promise<void>} handleDelivered
* @property {(event: EmailOpenedEvent) => Promise<void>} handleOpened
* @property {(event: EmailBouncedEvent) => Promise<void>} handlePermanentFailed
* @property {(event: EmailTemporaryBouncedEvent) => Promise<void>} handleTemporaryFailed
* @property {(event: EmailUnsubscribedEvent) => Promise<void>} handleUnsubscribed
* @property {(event: SpamComplaintEvent) => Promise<void>} handleComplained
*/
/**
* WARNING: this class is used in a separate thread (an offloaded job). Be careful when working with settings and models.
*/
class EmailEventProcessor {
#domainEvents;
#db;
#eventStorage;
constructor({domainEvents, db, eventStorage}) {
this.#domainEvents = domainEvents;
this.#db = db;
this.#eventStorage = eventStorage;
// Avoid having to query email_batch by provider_id for every event
this.providerIdEmailIdMap = {};
}
/**
* @param {EmailIdentification} emailIdentification
* @param {Date} timestamp
*/
async handleDelivered(emailIdentification, timestamp) {
const recipient = await this.getRecipient(emailIdentification);
if (recipient) {
const event = EmailDeliveredEvent.create({
email: emailIdentification.email,
emailRecipientId: recipient.emailRecipientId,
memberId: recipient.memberId,
emailId: recipient.emailId,
timestamp
});
await this.#eventStorage.handleDelivered(event);
this.#domainEvents.dispatch(event);
}
return recipient;
}
/**
* @param {EmailIdentification} emailIdentification
* @param {Date} timestamp
*/
async handleOpened(emailIdentification, timestamp) {
const recipient = await this.getRecipient(emailIdentification);
if (recipient) {
const event = EmailOpenedEvent.create({
email: emailIdentification.email,
emailRecipientId: recipient.emailRecipientId,
memberId: recipient.memberId,
emailId: recipient.emailId,
timestamp
});
this.#domainEvents.dispatch(event);
await this.#eventStorage.handleOpened(event);
}
return recipient;
}
/**
* @param {EmailIdentification} emailIdentification
* @param {{id: string, timestamp: Date, error: {code: number; message: string; enhandedCode: string|number} | null}} event
*/
async handleTemporaryFailed(emailIdentification, {timestamp, error, id}) {
const recipient = await this.getRecipient(emailIdentification);
if (recipient) {
const event = EmailTemporaryBouncedEvent.create({
id,
error,
email: emailIdentification.email,
memberId: recipient.memberId,
emailId: recipient.emailId,
emailRecipientId: recipient.emailRecipientId,
timestamp
});
await this.#eventStorage.handleTemporaryFailed(event);
this.#domainEvents.dispatch(event);
}
return recipient;
}
/**
* @param {EmailIdentification} emailIdentification
* @param {{id: string, timestamp: Date, error: {code: number; message: string; enhandedCode: string|number} | null}} event
*/
async handlePermanentFailed(emailIdentification, {timestamp, error, id}) {
const recipient = await this.getRecipient(emailIdentification);
if (recipient) {
const event = EmailBouncedEvent.create({
id,
error,
email: emailIdentification.email,
memberId: recipient.memberId,
emailId: recipient.emailId,
emailRecipientId: recipient.emailRecipientId,
timestamp
});
await this.#eventStorage.handlePermanentFailed(event);
this.#domainEvents.dispatch(event);
await waitForEvent(); // Avoids knex connection pool to run dry
}
return recipient;
}
/**
* @param {EmailIdentification} emailIdentification
* @param {Date} timestamp
*/
async handleUnsubscribed(emailIdentification, timestamp) {
const recipient = await this.getRecipient(emailIdentification);
if (recipient) {
const event = EmailUnsubscribedEvent.create({
email: emailIdentification.email,
memberId: recipient.memberId,
emailId: recipient.emailId,
timestamp
});
await this.#eventStorage.handleUnsubscribed(event);
this.#domainEvents.dispatch(event);
}
return recipient;
}
/**
* @param {EmailIdentification} emailIdentification
* @param {Date} timestamp
*/
async handleComplained(emailIdentification, timestamp) {
const recipient = await this.getRecipient(emailIdentification);
if (recipient) {
const event = SpamComplaintEvent.create({
email: emailIdentification.email,
memberId: recipient.memberId,
emailId: recipient.emailId,
timestamp
});
await this.#eventStorage.handleComplained(event);
this.#domainEvents.dispatch(event);
await waitForEvent(); // Avoids knex connection pool to run dry
}
return recipient;
}
/**
* @private
* @param {EmailIdentification} emailIdentification
* @returns {Promise<EmailRecipientInformation|undefined>}
*/
async getRecipient(emailIdentification) {
if (!emailIdentification.emailId && !emailIdentification.providerId) {
// Protection if both are null or undefined
return;
}
// With the provider_id and email address we can look for the EmailRecipient
const emailId = emailIdentification.emailId ?? await this.getEmailId(emailIdentification.providerId);
if (!emailId) {
// Invalid
return;
}
const {id: emailRecipientId, member_id: memberId} = await this.#db.knex('email_recipients')
.select('id', 'member_id')
.where('member_email', emailIdentification.email)
.where('email_id', emailId)
.first() || {};
if (emailRecipientId && memberId) {
return {
emailRecipientId,
memberId,
emailId
};
}
}
/**
* @private
* @param {string} providerId
* @returns {Promise<string|undefined>}
*/
async getEmailId(providerId) {
if (this.providerIdEmailIdMap[providerId]) {
return this.providerIdEmailIdMap[providerId];
}
const {emailId} = await this.#db.knex('email_batches')
.select('email_id as emailId')
.where('provider_id', providerId)
.first() || {};
if (!emailId) {
return;
}
this.providerIdEmailIdMap[providerId] = emailId;
return emailId;
}
}
module.exports = EmailEventProcessor;