✨ Updated email analytics job to prioritize open events (#20800)
ref https://linear.app/tryghost/issue/ENG-1477 - updated email analytics job to prioritize open events - put limits on non-open event fetching - updated job to now restart itself until processing is at a sufficiently low volume Previously the EmailAnalytics job would process all event data equally. When there's sufficient recipients (>20k), we could see delays in the open rate data in Admin because of all the delivered events being processed. Open events are far more important to users, so we've now prioritized processing those events before any others. Processing of events shouldn't be any faster or slower with this as this doesn't change throughput, just order. NOTE: Use the mailgun-mock-server in TryGhost/Toolbox for testing.
This commit is contained in:
parent
2f36d6a4de
commit
4267ff9be6
@ -57,11 +57,22 @@ class EmailAnalyticsServiceWrapper {
|
||||
});
|
||||
}
|
||||
|
||||
async fetchLatest({maxEvents} = {maxEvents: Infinity}) {
|
||||
logging.info('[EmailAnalytics] Fetch latest started');
|
||||
async fetchLatestOpenedEvents({maxEvents} = {maxEvents: Infinity}) {
|
||||
logging.info('[EmailAnalytics] Fetch latest opened events started');
|
||||
|
||||
const fetchStartDate = new Date();
|
||||
const totalEvents = await this.service.fetchLatest({maxEvents});
|
||||
const totalEvents = await this.service.fetchLatestOpenedEvents({maxEvents});
|
||||
const fetchEndDate = new Date();
|
||||
|
||||
logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (latest opens)`);
|
||||
return totalEvents;
|
||||
}
|
||||
|
||||
async fetchLatestNonOpenedEvents({maxEvents} = {maxEvents: Infinity}) {
|
||||
logging.info('[EmailAnalytics] Fetch latest non-opened events started');
|
||||
|
||||
const fetchStartDate = new Date();
|
||||
const totalEvents = await this.service.fetchLatestNonOpenedEvents({maxEvents});
|
||||
const fetchEndDate = new Date();
|
||||
|
||||
logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (latest)`);
|
||||
@ -69,7 +80,7 @@ class EmailAnalyticsServiceWrapper {
|
||||
}
|
||||
|
||||
async fetchMissing({maxEvents} = {maxEvents: Infinity}) {
|
||||
logging.info('[EmailAnalytics] Fetch missing started');
|
||||
logging.info('[EmailAnalytics] Fetch missing events started');
|
||||
|
||||
const fetchStartDate = new Date();
|
||||
const totalEvents = await this.service.fetchMissing({maxEvents});
|
||||
@ -83,7 +94,7 @@ class EmailAnalyticsServiceWrapper {
|
||||
if (maxEvents < 300) {
|
||||
return 0;
|
||||
}
|
||||
logging.info('[EmailAnalytics] Fetch scheduled started');
|
||||
logging.info('[EmailAnalytics] Fetch scheduled events started');
|
||||
|
||||
const fetchStartDate = new Date();
|
||||
const totalEvents = await this.service.fetchScheduled({maxEvents});
|
||||
@ -100,13 +111,31 @@ class EmailAnalyticsServiceWrapper {
|
||||
}
|
||||
this.fetching = true;
|
||||
|
||||
// NOTE: Data shows we can process ~7500 events per minute on Pro; this can vary locally
|
||||
try {
|
||||
const c1 = await this.fetchLatest({maxEvents: Infinity});
|
||||
const c2 = await this.fetchMissing({maxEvents: Infinity});
|
||||
// Prioritize opens since they are the most important (only data directly displayed to users)
|
||||
await this.fetchLatestOpenedEvents({maxEvents: Infinity});
|
||||
|
||||
// Set limits on how much we fetch without checkings for opened events. During surge events (following newsletter send)
|
||||
// we want to make sure we don't spend too much time collecting delivery data.
|
||||
const c1 = await this.fetchLatestNonOpenedEvents({maxEvents: 20000});
|
||||
if (c1 > 15000) {
|
||||
this.fetching = false;
|
||||
logging.info('[EmailAnalytics] Restarting fetch due to high event count');
|
||||
this.startFetch();
|
||||
return;
|
||||
}
|
||||
const c2 = await this.fetchMissing({maxEvents: 20000});
|
||||
if ((c1 + c2) > 15000) {
|
||||
this.fetching = false;
|
||||
logging.info('[EmailAnalytics] Restarting fetch due to high event count');
|
||||
this.startFetch();
|
||||
return;
|
||||
}
|
||||
|
||||
// Only fetch scheduled if we didn't fetch a lot of normal events
|
||||
await this.fetchScheduled({maxEvents: 20000 - c1 - c2});
|
||||
|
||||
|
||||
this.fetching = false;
|
||||
} catch (e) {
|
||||
logging.error(e, 'Error while fetching email analytics');
|
||||
|
@ -11,30 +11,35 @@ module.exports = {
|
||||
return emailCount && emailCount.count > 0;
|
||||
},
|
||||
|
||||
async getLastSeenEventTimestamp() {
|
||||
/**
|
||||
* Retrieves the timestamp of the last seen event for the specified email analytics events.
|
||||
* @param {string[]} events - The email analytics events to consider (default: ['delivered', 'opened', 'failed']).
|
||||
* @returns {Promise<Date|null>} The timestamp of the last seen event, or null if no events are found.
|
||||
*/
|
||||
async getLastEventTimestamp(events = ['delivered', 'opened', 'failed']) {
|
||||
const startDate = new Date();
|
||||
|
||||
// three separate queries is much faster than using max/greatest (with coalesce to handle nulls) across columns
|
||||
let {maxDeliveredAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first() || {};
|
||||
let {maxOpenedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first() || {};
|
||||
let {maxFailedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first() || {};
|
||||
// separate queries is much faster than using max/greatest (with coalesce to handle nulls) across columns
|
||||
let {maxOpenedAt} = events.includes('opened') ? await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first() : null;
|
||||
let {maxDeliveredAt} = events.includes('delivered') ? await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first() : null;
|
||||
let {maxFailedAt} = events.includes('failed') ? await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first() : null;
|
||||
|
||||
if (maxOpenedAt && !(maxOpenedAt instanceof Date)) {
|
||||
// SQLite returns a string instead of a Date
|
||||
maxOpenedAt = new Date(maxOpenedAt);
|
||||
}
|
||||
|
||||
if (maxDeliveredAt && !(maxDeliveredAt instanceof Date)) {
|
||||
// SQLite returns a string instead of a Date
|
||||
maxDeliveredAt = new Date(maxDeliveredAt);
|
||||
}
|
||||
|
||||
if (maxOpenedAt && !(maxOpenedAt instanceof Date)) {
|
||||
// SQLite returns a string instead of a Date
|
||||
maxOpenedAt = new Date(maxOpenedAt);
|
||||
}
|
||||
|
||||
if (maxFailedAt && !(maxFailedAt instanceof Date)) {
|
||||
// SQLite returns a string instead of a Date
|
||||
maxFailedAt = new Date(maxFailedAt);
|
||||
}
|
||||
|
||||
const lastSeenEventTimestamp = _.max([maxDeliveredAt, maxOpenedAt, maxFailedAt]);
|
||||
const lastSeenEventTimestamp = _.max([maxOpenedAt, maxDeliveredAt, maxFailedAt]);
|
||||
debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`);
|
||||
|
||||
return lastSeenEventTimestamp;
|
||||
|
@ -23,7 +23,7 @@ describe('EmailEventStorage', function () {
|
||||
before(async function () {
|
||||
// Stub queries before boot
|
||||
const queries = require('../../../../core/server/services/email-analytics/lib/queries');
|
||||
sinon.stub(queries, 'getLastSeenEventTimestamp').callsFake(async function () {
|
||||
sinon.stub(queries, 'getLastEventTimestamp').callsFake(async function () {
|
||||
// This is required because otherwise the last event timestamp will be now, and that is too close to NOW to start fetching new events
|
||||
return new Date(2000, 0, 1);
|
||||
});
|
||||
@ -78,7 +78,7 @@ describe('EmailEventStorage', function () {
|
||||
|
||||
// Fire event processing
|
||||
// We use offloading to have correct coverage and usage of worker thread
|
||||
const result = await emailAnalytics.fetchLatest();
|
||||
const result = await emailAnalytics.fetchLatestNonOpenedEvents();
|
||||
assert.equal(result, 1);
|
||||
|
||||
// Since this is all event based we should wait for all dispatched events to be completed.
|
||||
@ -125,7 +125,7 @@ describe('EmailEventStorage', function () {
|
||||
assert.equal(initialModel.get('delivered_at'), null);
|
||||
|
||||
// Fire event processing
|
||||
const result = await emailAnalytics.fetchLatest();
|
||||
const result = await emailAnalytics.fetchLatestNonOpenedEvents();
|
||||
assert.equal(result, 1);
|
||||
|
||||
// Since this is all event based we should wait for all dispatched events to be completed.
|
||||
@ -170,7 +170,7 @@ describe('EmailEventStorage', function () {
|
||||
assert.equal(initialModel.get('opened_at'), null);
|
||||
|
||||
// Fire event processing
|
||||
const result = await emailAnalytics.fetchLatest();
|
||||
const result = await emailAnalytics.fetchLatestOpenedEvents();
|
||||
assert.equal(result, 1);
|
||||
|
||||
// Since this is all event based we should wait for all dispatched events to be completed.
|
||||
@ -250,7 +250,7 @@ describe('EmailEventStorage', function () {
|
||||
assert.notEqual(initialModel.get('delivered_at'), null);
|
||||
|
||||
// Fire event processing
|
||||
const result = await emailAnalytics.fetchLatest();
|
||||
const result = await emailAnalytics.fetchLatestOpenedEvents();
|
||||
assert.equal(result, 1);
|
||||
|
||||
// Since this is all event based we should wait for all dispatched events to be completed.
|
||||
@ -346,7 +346,7 @@ describe('EmailEventStorage', function () {
|
||||
assert.notEqual(initialModel.get('delivered_at'), null);
|
||||
|
||||
// Fire event processing
|
||||
const result = await emailAnalytics.fetchLatest();
|
||||
const result = await emailAnalytics.fetchLatestOpenedEvents();
|
||||
assert.equal(result, 1);
|
||||
|
||||
// Since this is all event based we should wait for all dispatched events to be completed.
|
||||
@ -439,7 +439,7 @@ describe('EmailEventStorage', function () {
|
||||
assert.notEqual(initialModel.get('failed_at'), null, 'This test requires a failed email recipient');
|
||||
|
||||
// Fire event processing
|
||||
const result = await emailAnalytics.fetchLatest();
|
||||
const result = await emailAnalytics.fetchLatestOpenedEvents();
|
||||
assert.equal(result, 1);
|
||||
|
||||
// Since this is all event based we should wait for all dispatched events to be completed.
|
||||
@ -529,7 +529,7 @@ describe('EmailEventStorage', function () {
|
||||
assert.equal(initialModel.get('failed_at'), null);
|
||||
|
||||
// Fire event processing
|
||||
const result = await emailAnalytics.fetchLatest();
|
||||
const result = await emailAnalytics.fetchLatestOpenedEvents();
|
||||
assert.equal(result, 1);
|
||||
|
||||
// Since this is all event based we should wait for all dispatched events to be completed.
|
||||
@ -645,7 +645,7 @@ describe('EmailEventStorage', function () {
|
||||
assert.equal(initialModel.get('failed_at'), null);
|
||||
|
||||
// Fire event processing
|
||||
const result = await emailAnalytics.fetchLatest();
|
||||
const result = await emailAnalytics.fetchLatestOpenedEvents();
|
||||
assert.equal(result, 1);
|
||||
|
||||
// Since this is all event based we should wait for all dispatched events to be completed.
|
||||
@ -747,7 +747,7 @@ describe('EmailEventStorage', function () {
|
||||
}];
|
||||
|
||||
// Fire event processing
|
||||
const result = await emailAnalytics.fetchLatest();
|
||||
const result = await emailAnalytics.fetchLatestOpenedEvents();
|
||||
assert.equal(result, 1);
|
||||
|
||||
// Since this is all event based we should wait for all dispatched events to be completed.
|
||||
@ -849,7 +849,7 @@ describe('EmailEventStorage', function () {
|
||||
}];
|
||||
|
||||
// Fire event processing
|
||||
const result = await emailAnalytics.fetchLatest();
|
||||
const result = await emailAnalytics.fetchLatestOpenedEvents();
|
||||
assert.equal(result, 1);
|
||||
|
||||
// Since this is all event based we should wait for all dispatched events to be completed.
|
||||
@ -951,7 +951,7 @@ describe('EmailEventStorage', function () {
|
||||
}];
|
||||
|
||||
// Fire event processing
|
||||
const result = await emailAnalytics.fetchLatest();
|
||||
const result = await emailAnalytics.fetchLatestOpenedEvents();
|
||||
assert.equal(result, 1);
|
||||
|
||||
// Since this is all event based we should wait for all dispatched events to be completed.
|
||||
@ -1015,7 +1015,7 @@ describe('EmailEventStorage', function () {
|
||||
}];
|
||||
|
||||
// Fire event processing
|
||||
const result = await emailAnalytics.fetchLatest();
|
||||
const result = await emailAnalytics.fetchLatestOpenedEvents();
|
||||
assert.equal(result, 1);
|
||||
|
||||
// Since this is all event based we should wait for all dispatched events to be completed.
|
||||
@ -1074,7 +1074,7 @@ describe('EmailEventStorage', function () {
|
||||
}];
|
||||
|
||||
// Fire event processing
|
||||
const result = await emailAnalytics.fetchLatest();
|
||||
const result = await emailAnalytics.fetchLatestOpenedEvents();
|
||||
assert.equal(result, 1);
|
||||
|
||||
// Since this is all event based we should wait for all dispatched events to be completed.
|
||||
@ -1118,7 +1118,7 @@ describe('EmailEventStorage', function () {
|
||||
}];
|
||||
|
||||
// Fire event processing
|
||||
const result = await emailAnalytics.fetchLatest();
|
||||
const result = await emailAnalytics.fetchLatestOpenedEvents();
|
||||
assert.equal(result, 1);
|
||||
});
|
||||
|
||||
@ -1132,7 +1132,7 @@ describe('EmailEventStorage', function () {
|
||||
}];
|
||||
|
||||
// Fire event processing
|
||||
const result = await emailAnalytics.fetchLatest();
|
||||
const result = await emailAnalytics.fetchLatestOpenedEvents();
|
||||
assert.equal(result, 0);
|
||||
});
|
||||
});
|
||||
|
@ -44,7 +44,7 @@ describe('MailgunEmailSuppressionList', function () {
|
||||
recipient
|
||||
})];
|
||||
|
||||
await emailAnalytics.fetchLatest();
|
||||
await emailAnalytics.fetchLatestOpenedEvents();
|
||||
await DomainEvents.allSettled();
|
||||
|
||||
const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
|
||||
@ -72,7 +72,7 @@ describe('MailgunEmailSuppressionList', function () {
|
||||
recipient
|
||||
})];
|
||||
|
||||
await emailAnalytics.fetchLatest();
|
||||
await emailAnalytics.fetchLatestOpenedEvents();
|
||||
await DomainEvents.allSettled();
|
||||
|
||||
const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
|
||||
@ -100,7 +100,7 @@ describe('MailgunEmailSuppressionList', function () {
|
||||
recipient
|
||||
})];
|
||||
|
||||
await emailAnalytics.fetchLatest();
|
||||
await emailAnalytics.fetchLatestOpenedEvents();
|
||||
await DomainEvents.allSettled();
|
||||
|
||||
const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
|
||||
@ -128,7 +128,7 @@ describe('MailgunEmailSuppressionList', function () {
|
||||
recipient
|
||||
})];
|
||||
|
||||
await emailAnalytics.fetchLatest();
|
||||
await emailAnalytics.fetchLatestOpenedEvents();
|
||||
await DomainEvents.allSettled();
|
||||
|
||||
const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
|
||||
@ -163,7 +163,7 @@ describe('MailgunEmailSuppressionList', function () {
|
||||
timestamp: Math.round(timestamp.getTime() / 1000)
|
||||
}];
|
||||
|
||||
await emailAnalytics.fetchLatest();
|
||||
await emailAnalytics.fetchLatestOpenedEvents();
|
||||
await DomainEvents.allSettled();
|
||||
|
||||
const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`);
|
||||
|
@ -1,6 +1,6 @@
|
||||
const MailgunClient = require('@tryghost/mailgun-client');
|
||||
|
||||
const EVENT_FILTER = 'delivered OR opened OR failed OR unsubscribed OR complained';
|
||||
const DEFAULT_EVENT_FILTER = 'delivered OR opened OR failed OR unsubscribed OR complained';
|
||||
const PAGE_LIMIT = 300;
|
||||
const DEFAULT_TAGS = ['bulk-email'];
|
||||
|
||||
@ -26,11 +26,12 @@ class EmailAnalyticsProviderMailgun {
|
||||
* @param {Number} [options.maxEvents] Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks.
|
||||
* @param {Date} [options.begin]
|
||||
* @param {Date} [options.end]
|
||||
* @param {String[]} [options.events]
|
||||
*/
|
||||
fetchLatest(batchHandler, options) {
|
||||
const mailgunOptions = {
|
||||
limit: PAGE_LIMIT,
|
||||
event: EVENT_FILTER,
|
||||
event: options?.events ? options.events.join(' OR ') : DEFAULT_EVENT_FILTER,
|
||||
tags: this.tags.join(' AND '),
|
||||
begin: options.begin ? options.begin.getTime() / 1000 : undefined,
|
||||
end: options.end ? options.end.getTime() / 1000 : undefined,
|
||||
|
@ -155,5 +155,28 @@ describe('EmailAnalyticsProviderMailgun', function () {
|
||||
tags: 'bulk-email AND custom-tag'
|
||||
}, batchHandler, {maxEvents: undefined});
|
||||
});
|
||||
|
||||
it('uses provided events when supplied', async function () {
|
||||
const configStub = sinon.stub(config, 'get');
|
||||
configStub.withArgs('bulkEmail').returns({
|
||||
mailgun: {
|
||||
apiKey: 'apiKey',
|
||||
domain: 'domain.com',
|
||||
baseUrl: 'https://api.mailgun.net/v3'
|
||||
}
|
||||
});
|
||||
const mailgunProvider = new EmailAnalyticsProviderMailgun({config, settings});
|
||||
|
||||
const batchHandler = sinon.spy();
|
||||
const mailgunFetchEventsStub = sinon.stub(mailgunProvider.mailgunClient, 'fetchEvents').returns(SAMPLE_EVENTS);
|
||||
|
||||
await mailgunProvider.fetchLatest(batchHandler, {events: ['delivered'], begin: LATEST_TIMESTAMP});
|
||||
|
||||
sinon.assert.calledWithExactly(mailgunFetchEventsStub, {
|
||||
...MAILGUN_OPTIONS,
|
||||
event: 'delivered',
|
||||
tags: 'bulk-email'
|
||||
}, batchHandler, {maxEvents: undefined});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -32,13 +32,18 @@ module.exports = class EmailAnalyticsService {
|
||||
/**
|
||||
* @type {FetchData}
|
||||
*/
|
||||
#fetchLatestData = null;
|
||||
#fetchLatestNonOpenedData = null;
|
||||
|
||||
/**
|
||||
* @type {FetchData}
|
||||
*/
|
||||
#fetchMissingData = null;
|
||||
|
||||
/**
|
||||
* @type {FetchData}
|
||||
*/
|
||||
#fetchLatestOpenedData = null;
|
||||
|
||||
/**
|
||||
* @type {FetchDataScheduled}
|
||||
*/
|
||||
@ -58,38 +63,64 @@ module.exports = class EmailAnalyticsService {
|
||||
|
||||
getStatus() {
|
||||
return {
|
||||
latest: this.#fetchLatestData,
|
||||
latest: this.#fetchLatestNonOpenedData,
|
||||
missing: this.#fetchMissingData,
|
||||
scheduled: this.#fetchScheduledData
|
||||
scheduled: this.#fetchScheduledData,
|
||||
latestOpened: this.#fetchLatestOpenedData
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the timestamp of the last event we processed. Defaults to now minus 30 minutes if we have no data yet.
|
||||
*/
|
||||
async getLastEventTimestamp() {
|
||||
return this.#fetchLatestData?.lastEventTimestamp ?? (await this.queries.getLastSeenEventTimestamp()) ?? new Date(Date.now() - TRUST_THRESHOLD_MS);
|
||||
async getLastNonOpenedEventTimestamp() {
|
||||
return this.#fetchLatestNonOpenedData?.lastEventTimestamp ?? (await this.queries.getLastEventTimestamp(['delivered','failed'])) ?? new Date(Date.now() - TRUST_THRESHOLD_MS);
|
||||
}
|
||||
|
||||
async fetchLatest({maxEvents = Infinity} = {}) {
|
||||
async getLastOpenedEventTimestamp() {
|
||||
return this.#fetchLatestOpenedData?.lastEventTimestamp ?? (await this.queries.getLastEventTimestamp(['opened'])) ?? new Date(Date.now() - TRUST_THRESHOLD_MS);
|
||||
}
|
||||
|
||||
async fetchLatestOpenedEvents({maxEvents = Infinity} = {}) {
|
||||
// Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available
|
||||
const begin = await this.getLastEventTimestamp();
|
||||
const end = new Date(Date.now() - FETCH_LATEST_END_MARGIN_MS); // ALways stop at x minutes ago to give Mailgun a bit more time to stabilize storage
|
||||
const begin = await this.getLastOpenedEventTimestamp();
|
||||
const end = new Date(Date.now() - FETCH_LATEST_END_MARGIN_MS); // Always stop at x minutes ago to give Mailgun a bit more time to stabilize storage
|
||||
|
||||
if (end <= begin) {
|
||||
// Skip for now
|
||||
logging.info('[EmailAnalytics] Skipping fetchLatest because end (' + end + ') is before begin (' + begin + ')');
|
||||
logging.info('[EmailAnalytics] Skipping fetchLatestOpenedEvents because end (' + end + ') is before begin (' + begin + ')');
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Create the fetch data object if it doesn't exist yet
|
||||
if (!this.#fetchLatestData) {
|
||||
this.#fetchLatestData = {
|
||||
if (!this.#fetchLatestOpenedData) {
|
||||
this.#fetchLatestOpenedData = {
|
||||
running: false
|
||||
};
|
||||
}
|
||||
|
||||
return await this.#fetchEvents(this.#fetchLatestData, {begin, end, maxEvents});
|
||||
return await this.#fetchEvents(this.#fetchLatestOpenedData, {begin, end, maxEvents, events: ['opened']});
|
||||
}
|
||||
|
||||
async fetchLatestNonOpenedEvents({maxEvents = Infinity} = {}) {
|
||||
// Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available
|
||||
const begin = await this.getLastNonOpenedEventTimestamp();
|
||||
const end = new Date(Date.now() - FETCH_LATEST_END_MARGIN_MS); // Always stop at x minutes ago to give Mailgun a bit more time to stabilize storage
|
||||
|
||||
if (end <= begin) {
|
||||
// Skip for now
|
||||
logging.info('[EmailAnalytics] Skipping fetchLatestNonOpenedEvents because end (' + end + ') is before begin (' + begin + ')');
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Create the fetch data object if it doesn't exist yet
|
||||
if (!this.#fetchLatestNonOpenedData) {
|
||||
this.#fetchLatestNonOpenedData = {
|
||||
running: false
|
||||
};
|
||||
}
|
||||
|
||||
return await this.#fetchEvents(this.#fetchLatestNonOpenedData, {begin, end, maxEvents, events: ['delivered', 'failed', 'unsubscribed', 'complained']});
|
||||
}
|
||||
|
||||
/**
|
||||
@ -101,11 +132,11 @@ module.exports = class EmailAnalyticsService {
|
||||
// We start where we left of, or 1,5h ago after a server restart
|
||||
const begin = this.#fetchMissingData?.lastEventTimestamp ?? this.#fetchMissingData?.lastBegin ?? new Date(Date.now() - TRUST_THRESHOLD_MS * 3);
|
||||
|
||||
// Always stop at the time the fetchLatest started fetching on, or maximum 30 minutes ago
|
||||
// Always stop at the earlier of the time the fetchLatest started fetching on or 30 minutes ago
|
||||
const end = new Date(
|
||||
Math.min(
|
||||
Date.now() - TRUST_THRESHOLD_MS,
|
||||
this.#fetchLatestData?.lastBegin?.getTime()
|
||||
this.#fetchLatestNonOpenedData?.lastBegin?.getTime()
|
||||
)
|
||||
);
|
||||
|
||||
@ -200,8 +231,9 @@ module.exports = class EmailAnalyticsService {
|
||||
* @param {Date} options.begin
|
||||
* @param {Date} options.end
|
||||
* @param {number} [options.maxEvents] Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks.
|
||||
* @param {String[]} [options.eventTypes] Only fetch these events, ['delivered', 'opened', 'failed', 'unsubscribed', 'complained']
|
||||
*/
|
||||
async #fetchEvents(fetchData, {begin, end, maxEvents = Infinity}) {
|
||||
async #fetchEvents(fetchData, {begin, end, maxEvents = Infinity, eventTypes = null}) {
|
||||
// Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available
|
||||
logging.info('[EmailAnalytics] Fetching from ' + begin.toISOString() + ' until ' + end.toISOString() + ' (maxEvents: ' + maxEvents + ')');
|
||||
|
||||
@ -246,7 +278,7 @@ module.exports = class EmailAnalyticsService {
|
||||
|
||||
try {
|
||||
for (const provider of this.providers) {
|
||||
await provider.fetchLatest(processBatch, {begin, end, maxEvents});
|
||||
await provider.fetchLatest(processBatch, {begin, end, maxEvents, events: eventTypes});
|
||||
}
|
||||
|
||||
logging.info('[EmailAnalytics] Fetching finished');
|
||||
|
@ -10,39 +10,126 @@ const {
|
||||
const EventProcessingResult = require('../lib/EventProcessingResult');
|
||||
|
||||
describe('EmailAnalyticsService', function () {
|
||||
let eventProcessor;
|
||||
beforeEach(function () {
|
||||
eventProcessor = {};
|
||||
eventProcessor.handleDelivered = sinon.stub().callsFake(({emailId}) => {
|
||||
return {
|
||||
emailId,
|
||||
emailRecipientId: emailId,
|
||||
memberId: 1
|
||||
};
|
||||
});
|
||||
eventProcessor.handleOpened = sinon.stub().callsFake(({emailId}) => {
|
||||
return {
|
||||
emailId,
|
||||
emailRecipientId: emailId,
|
||||
memberId: 1
|
||||
};
|
||||
describe('getStatus', function () {
|
||||
it('returns status object', function () {
|
||||
// these are null because we're not running them before calling this
|
||||
const service = new EmailAnalyticsService({});
|
||||
const result = service.getStatus();
|
||||
result.should.deepEqual({
|
||||
latest: null,
|
||||
missing: null,
|
||||
scheduled: null,
|
||||
latestOpened: null
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('fetchLatest', function () {
|
||||
describe('getLastNonOpenedEventTimestamp', function () {
|
||||
it('returns the queried timestamp before the fallback', async function () {
|
||||
const service = new EmailAnalyticsService({
|
||||
queries: {
|
||||
getLastEventTimestamp: sinon.stub().resolves(new Date(1))
|
||||
}
|
||||
});
|
||||
|
||||
const result = await service.getLastNonOpenedEventTimestamp();
|
||||
result.should.eql(new Date(1));
|
||||
});
|
||||
|
||||
it('returns the fallback if nothing is found', async function () {
|
||||
const service = new EmailAnalyticsService({
|
||||
queries: {
|
||||
getLastEventTimestamp: sinon.stub().resolves(null)
|
||||
}
|
||||
});
|
||||
|
||||
const result = await service.getLastNonOpenedEventTimestamp();
|
||||
result.should.eql(new Date(Date.now() - 30 * 60 * 1000)); // should be 30 mins prior
|
||||
});
|
||||
});
|
||||
|
||||
describe('getLastSeenOpenedEventTimestamp', function () {
|
||||
it('returns the queried timestamp before the fallback', async function () {
|
||||
const service = new EmailAnalyticsService({
|
||||
queries: {
|
||||
getLastEventTimestamp: sinon.stub().resolves(new Date(1))
|
||||
}
|
||||
});
|
||||
|
||||
const result = await service.getLastOpenedEventTimestamp();
|
||||
result.should.eql(new Date(1));
|
||||
});
|
||||
|
||||
it('returns the fallback if nothing is found', async function () {
|
||||
const service = new EmailAnalyticsService({
|
||||
queries: {
|
||||
getLastEventTimestamp: sinon.stub().resolves(null)
|
||||
}
|
||||
});
|
||||
|
||||
const result = await service.getLastOpenedEventTimestamp();
|
||||
result.should.eql(new Date(Date.now() - 30 * 60 * 1000)); // should be 30 mins prior
|
||||
});
|
||||
|
||||
it.skip('returns the cached value before the fallback', async function () {
|
||||
});
|
||||
});
|
||||
|
||||
describe('processEventBatch', function () {
|
||||
let eventProcessor;
|
||||
beforeEach(function () {
|
||||
eventProcessor = {};
|
||||
eventProcessor.handleDelivered = sinon.stub().callsFake(({emailId}) => {
|
||||
return {
|
||||
emailId,
|
||||
emailRecipientId: emailId,
|
||||
memberId: 1
|
||||
};
|
||||
});
|
||||
eventProcessor.handleOpened = sinon.stub().callsFake(({emailId}) => {
|
||||
return {
|
||||
emailId,
|
||||
emailRecipientId: emailId,
|
||||
memberId: 1
|
||||
};
|
||||
});
|
||||
eventProcessor.handlePermanentFailed = sinon.stub().callsFake(({emailId}) => {
|
||||
return {
|
||||
emailId,
|
||||
emailRecipientId: emailId,
|
||||
memberId: 1
|
||||
};
|
||||
});
|
||||
eventProcessor.handleTemporaryFailed = sinon.stub().callsFake(({emailId}) => {
|
||||
return {
|
||||
emailId,
|
||||
emailRecipientId: emailId,
|
||||
memberId: 1
|
||||
};
|
||||
});
|
||||
eventProcessor.handleUnsubscribed = sinon.stub().callsFake(({emailId}) => {
|
||||
return {
|
||||
emailId,
|
||||
emailRecipientId: emailId,
|
||||
memberId: 1
|
||||
};
|
||||
});
|
||||
eventProcessor.handleComplained = sinon.stub().callsFake(({emailId}) => {
|
||||
return {
|
||||
emailId,
|
||||
emailRecipientId: emailId,
|
||||
memberId: 1
|
||||
};
|
||||
});
|
||||
});
|
||||
|
||||
it('uses passed-in event processor', async function () {
|
||||
const service = new EmailAnalyticsService({
|
||||
eventProcessor
|
||||
});
|
||||
|
||||
const result = new EventProcessingResult();
|
||||
const fetchData = {
|
||||
|
||||
};
|
||||
const fetchData = {};
|
||||
await service.processEventBatch([{
|
||||
type: 'delivered',
|
||||
emailId: 1,
|
||||
@ -58,6 +145,7 @@ describe('EmailAnalyticsService', function () {
|
||||
}], result, fetchData);
|
||||
|
||||
eventProcessor.handleDelivered.callCount.should.eql(2);
|
||||
eventProcessor.handleOpened.callCount.should.eql(1);
|
||||
|
||||
result.should.deepEqual(new EventProcessingResult({
|
||||
delivered: 2,
|
||||
@ -71,6 +159,207 @@ describe('EmailAnalyticsService', function () {
|
||||
lastEventTimestamp: new Date(3)
|
||||
});
|
||||
});
|
||||
|
||||
it('handles opened', async function () {
|
||||
const service = new EmailAnalyticsService({
|
||||
eventProcessor
|
||||
});
|
||||
|
||||
const result = new EventProcessingResult();
|
||||
const fetchData = {};
|
||||
|
||||
await service.processEventBatch([{
|
||||
type: 'opened',
|
||||
emailId: 1,
|
||||
timestamp: new Date(1)
|
||||
}], result, fetchData);
|
||||
|
||||
eventProcessor.handleOpened.calledOnce.should.be.true();
|
||||
|
||||
result.should.deepEqual(new EventProcessingResult({
|
||||
delivered: 0,
|
||||
opened: 1,
|
||||
unprocessable: 0,
|
||||
emailIds: [1],
|
||||
memberIds: [1]
|
||||
}));
|
||||
|
||||
fetchData.should.deepEqual({
|
||||
lastEventTimestamp: new Date(1)
|
||||
});
|
||||
});
|
||||
|
||||
it('handles delivered', async function () {
|
||||
const service = new EmailAnalyticsService({
|
||||
eventProcessor
|
||||
});
|
||||
|
||||
const result = new EventProcessingResult();
|
||||
const fetchData = {};
|
||||
|
||||
await service.processEventBatch([{
|
||||
type: 'delivered',
|
||||
emailId: 1,
|
||||
timestamp: new Date(1)
|
||||
}], result, fetchData);
|
||||
|
||||
eventProcessor.handleDelivered.calledOnce.should.be.true();
|
||||
|
||||
result.should.deepEqual(new EventProcessingResult({
|
||||
delivered: 1,
|
||||
opened: 0,
|
||||
unprocessable: 0,
|
||||
emailIds: [1],
|
||||
memberIds: [1]
|
||||
}));
|
||||
|
||||
fetchData.should.deepEqual({
|
||||
lastEventTimestamp: new Date(1)
|
||||
});
|
||||
});
|
||||
|
||||
it('handles failed (permanent)', async function () {
|
||||
const service = new EmailAnalyticsService({
|
||||
eventProcessor
|
||||
});
|
||||
|
||||
const result = new EventProcessingResult();
|
||||
const fetchData = {};
|
||||
|
||||
await service.processEventBatch([{
|
||||
type: 'failed',
|
||||
severity: 'permanent',
|
||||
emailId: 1,
|
||||
timestamp: new Date(1)
|
||||
}], result, fetchData);
|
||||
|
||||
eventProcessor.handlePermanentFailed.calledOnce.should.be.true();
|
||||
|
||||
result.should.deepEqual(new EventProcessingResult({
|
||||
permanentFailed: 1,
|
||||
emailIds: [1],
|
||||
memberIds: [1]
|
||||
}));
|
||||
|
||||
fetchData.should.deepEqual({
|
||||
lastEventTimestamp: new Date(1)
|
||||
});
|
||||
});
|
||||
|
||||
it('handles failed (temporary)', async function () {
|
||||
const service = new EmailAnalyticsService({
|
||||
eventProcessor
|
||||
});
|
||||
|
||||
const result = new EventProcessingResult();
|
||||
const fetchData = {};
|
||||
|
||||
await service.processEventBatch([{
|
||||
type: 'failed',
|
||||
severity: 'temporary',
|
||||
emailId: 1,
|
||||
timestamp: new Date(1)
|
||||
}], result, fetchData);
|
||||
|
||||
eventProcessor.handleTemporaryFailed.calledOnce.should.be.true();
|
||||
|
||||
result.should.deepEqual(new EventProcessingResult({
|
||||
temporaryFailed: 1,
|
||||
emailIds: [1],
|
||||
memberIds: [1]
|
||||
}));
|
||||
|
||||
fetchData.should.deepEqual({
|
||||
lastEventTimestamp: new Date(1)
|
||||
});
|
||||
});
|
||||
|
||||
it('handles unsubscribed', async function () {
|
||||
const service = new EmailAnalyticsService({
|
||||
eventProcessor
|
||||
});
|
||||
|
||||
const result = new EventProcessingResult();
|
||||
const fetchData = {};
|
||||
|
||||
await service.processEventBatch([{
|
||||
type: 'unsubscribed',
|
||||
emailId: 1,
|
||||
timestamp: new Date(1)
|
||||
}], result, fetchData);
|
||||
|
||||
eventProcessor.handleUnsubscribed.calledOnce.should.be.true();
|
||||
eventProcessor.handleDelivered.called.should.be.false();
|
||||
eventProcessor.handleOpened.called.should.be.false();
|
||||
|
||||
result.should.deepEqual(new EventProcessingResult({
|
||||
unsubscribed: 1,
|
||||
emailIds: [1],
|
||||
memberIds: [1]
|
||||
}));
|
||||
|
||||
fetchData.should.deepEqual({
|
||||
lastEventTimestamp: new Date(1)
|
||||
});
|
||||
});
|
||||
|
||||
it('handles complained', async function () {
|
||||
const service = new EmailAnalyticsService({
|
||||
eventProcessor
|
||||
});
|
||||
|
||||
const result = new EventProcessingResult();
|
||||
const fetchData = {};
|
||||
|
||||
await service.processEventBatch([{
|
||||
type: 'complained',
|
||||
emailId: 1,
|
||||
timestamp: new Date(1)
|
||||
}], result, fetchData);
|
||||
|
||||
eventProcessor.handleComplained.calledOnce.should.be.true();
|
||||
eventProcessor.handleDelivered.called.should.be.false();
|
||||
eventProcessor.handleOpened.called.should.be.false();
|
||||
|
||||
result.should.deepEqual(new EventProcessingResult({
|
||||
complained: 1,
|
||||
emailIds: [1],
|
||||
memberIds: [1]
|
||||
}));
|
||||
|
||||
fetchData.should.deepEqual({
|
||||
lastEventTimestamp: new Date(1)
|
||||
});
|
||||
});
|
||||
|
||||
it(`doens't handle other event types`, async function () {
|
||||
const service = new EmailAnalyticsService({
|
||||
eventProcessor
|
||||
});
|
||||
|
||||
const result = new EventProcessingResult();
|
||||
const fetchData = {};
|
||||
|
||||
await service.processEventBatch([{
|
||||
type: 'notstandard',
|
||||
emailId: 1,
|
||||
timestamp: new Date(1)
|
||||
}], result, fetchData);
|
||||
|
||||
eventProcessor.handleDelivered.called.should.be.false();
|
||||
eventProcessor.handleOpened.called.should.be.false();
|
||||
|
||||
result.should.deepEqual(new EventProcessingResult({
|
||||
unhandled: 1
|
||||
}));
|
||||
|
||||
fetchData.should.deepEqual({
|
||||
lastEventTimestamp: new Date(1)
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('processEvent', function () {
|
||||
});
|
||||
|
||||
describe('aggregateStats', function () {
|
||||
@ -100,4 +389,34 @@ describe('EmailAnalyticsService', function () {
|
||||
service.queries.aggregateMemberStats.calledWith('m-2').should.be.true();
|
||||
});
|
||||
});
|
||||
|
||||
describe('aggregateEmailStats', function () {
|
||||
it('returns the query result', async function () {
|
||||
const service = new EmailAnalyticsService({
|
||||
queries: {
|
||||
aggregateEmailStats: sinon.stub().resolves()
|
||||
}
|
||||
});
|
||||
|
||||
await service.aggregateEmailStats('memberId');
|
||||
|
||||
service.queries.aggregateEmailStats.calledOnce.should.be.true();
|
||||
service.queries.aggregateEmailStats.calledWith('memberId').should.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe('aggregateMemberStats', function () {
|
||||
it('returns the query result', async function () {
|
||||
const service = new EmailAnalyticsService({
|
||||
queries: {
|
||||
aggregateMemberStats: sinon.stub().resolves()
|
||||
}
|
||||
});
|
||||
|
||||
await service.aggregateMemberStats('memberId');
|
||||
|
||||
service.queries.aggregateMemberStats.calledOnce.should.be.true();
|
||||
service.queries.aggregateMemberStats.calledWith('memberId').should.be.true;
|
||||
});
|
||||
});
|
||||
});
|
||||
|
Loading…
Reference in New Issue
Block a user