✨ Improved email analytics jobs system (#20809)
ref https://linear.app/tryghost/issue/ENG-952 - added persistence to the job timestamps This set of changes reduces the potential for gaps in our email event processing by adding persistence to the job timestamps. This avoids expensive queries on the `email_recipients` table after every boot, and reduces reliance on fallbacks in periods of heavy processing or reboot. This is our first use of the jobs table to create a persistent line, instead of its initial use case of single-run jobs. We may expand this capability and move to use of the jobs model over knex.raw in order to make this a bit friendlier. Note: this works with sqlite but datetimes are stored as ints. It still works fine. https://github.com/knex/knex/pull/5272
This commit is contained in:
parent
827518c98b
commit
0053939185
@ -1,9 +1,14 @@
|
|||||||
const _ = require('lodash');
|
const _ = require('lodash');
|
||||||
const debug = require('@tryghost/debug')('services:email-analytics');
|
const debug = require('@tryghost/debug')('services:email-analytics');
|
||||||
const db = require('../../../data/db');
|
const db = require('../../../data/db');
|
||||||
|
const logging = require('@tryghost/logging');
|
||||||
|
const {default: ObjectID} = require('bson-objectid');
|
||||||
|
|
||||||
const MIN_EMAIL_COUNT_FOR_OPEN_RATE = 5;
|
const MIN_EMAIL_COUNT_FOR_OPEN_RATE = 5;
|
||||||
|
|
||||||
|
/** @typedef {'email-analytics-latest-opened'|'email-analytics-latest-others'|'email-analytics-missing'|'email-analytics-scheduled'} EmailAnalyticsJobName */
|
||||||
|
/** @typedef {'delivered'|'opened'|'failed'} EmailAnalyticsEvent */
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
async shouldFetchStats() {
|
async shouldFetchStats() {
|
||||||
// don't fetch stats from Mailgun if we haven't sent any emails
|
// don't fetch stats from Mailgun if we haven't sent any emails
|
||||||
@ -13,31 +18,52 @@ module.exports = {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieves the timestamp of the last seen event for the specified email analytics events.
|
* Retrieves the timestamp of the last seen event for the specified email analytics events.
|
||||||
* @param {string[]} events - The email analytics events to consider (default: ['delivered', 'opened', 'failed']).
|
* @param {EmailAnalyticsJobName} jobName - The name of the job to update.
|
||||||
|
* @param {EmailAnalyticsEvent[]} [events=['delivered', 'opened', 'failed']] - The email analytics events to consider.
|
||||||
* @returns {Promise<Date|null>} The timestamp of the last seen event, or null if no events are found.
|
* @returns {Promise<Date|null>} The timestamp of the last seen event, or null if no events are found.
|
||||||
*/
|
*/
|
||||||
async getLastEventTimestamp(events = ['delivered', 'opened', 'failed']) {
|
async getLastEventTimestamp(jobName, events = ['delivered', 'opened', 'failed']) {
|
||||||
const startDate = new Date();
|
const startDate = new Date();
|
||||||
|
|
||||||
|
let maxOpenedAt;
|
||||||
|
let maxDeliveredAt;
|
||||||
|
let maxFailedAt;
|
||||||
|
|
||||||
// separate queries is much faster than using max/greatest (with coalesce to handle nulls) across columns
|
const jobData = await db.knex('jobs').select('finished_at', 'started_at').where('name', jobName).first();
|
||||||
let maxOpenedAt = events.includes('opened') ? (await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first()).maxOpenedAt : null;
|
|
||||||
let maxDeliveredAt = events.includes('delivered') ? (await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first()).maxDeliveredAt : null;
|
|
||||||
let maxFailedAt = events.includes('failed') ? (await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first()).maxFailedAt : null;
|
|
||||||
|
|
||||||
if (maxOpenedAt && !(maxOpenedAt instanceof Date)) {
|
if (jobData) {
|
||||||
// SQLite returns a string instead of a Date
|
debug(`Using job data for ${jobName}`);
|
||||||
maxOpenedAt = new Date(maxOpenedAt);
|
const lastJobTimestamp = jobData.finished_at || jobData.started_at;
|
||||||
|
maxOpenedAt = events.includes('opened') ? lastJobTimestamp : null;
|
||||||
|
maxDeliveredAt = events.includes('delivered') ? lastJobTimestamp : null;
|
||||||
|
maxFailedAt = events.includes('failed') ? lastJobTimestamp : null;
|
||||||
|
} else {
|
||||||
|
debug(`Job data not found for ${jobName}, using email_recipients data`);
|
||||||
|
logging.info(`Job data not found for ${jobName}, using email_recipients data`);
|
||||||
|
if (events.includes('opened')) {
|
||||||
|
maxOpenedAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first()).maxOpenedAt;
|
||||||
|
}
|
||||||
|
if (events.includes('delivered')) {
|
||||||
|
maxDeliveredAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first()).maxDeliveredAt;
|
||||||
|
}
|
||||||
|
if (events.includes('failed')) {
|
||||||
|
maxFailedAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first()).maxFailedAt;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert a new job row if it doesn't exist
|
||||||
|
await db.knex('jobs').insert({
|
||||||
|
id: new ObjectID().toHexString(),
|
||||||
|
name: jobName,
|
||||||
|
started_at: new Date(),
|
||||||
|
created_at: new Date(),
|
||||||
|
status: 'started'
|
||||||
|
}).onConflict('name').ignore();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (maxDeliveredAt && !(maxDeliveredAt instanceof Date)) {
|
// Convert string dates to Date objects for SQLite compatibility
|
||||||
// SQLite returns a string instead of a Date
|
[maxOpenedAt, maxDeliveredAt, maxFailedAt] = [maxOpenedAt, maxDeliveredAt, maxFailedAt].map(date => (
|
||||||
maxDeliveredAt = new Date(maxDeliveredAt);
|
date && !(date instanceof Date) ? new Date(date) : date
|
||||||
}
|
));
|
||||||
|
|
||||||
if (maxFailedAt && !(maxFailedAt instanceof Date)) {
|
|
||||||
// SQLite returns a string instead of a Date
|
|
||||||
maxFailedAt = new Date(maxFailedAt);
|
|
||||||
}
|
|
||||||
|
|
||||||
const lastSeenEventTimestamp = _.max([maxOpenedAt, maxDeliveredAt, maxFailedAt]);
|
const lastSeenEventTimestamp = _.max([maxOpenedAt, maxDeliveredAt, maxFailedAt]);
|
||||||
debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`);
|
debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`);
|
||||||
@ -45,6 +71,71 @@ module.exports = {
|
|||||||
return lastSeenEventTimestamp;
|
return lastSeenEventTimestamp;
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the timestamp of the last seen event for the specified email analytics events.
|
||||||
|
* @param {EmailAnalyticsJobName} jobName - The name of the job to update.
|
||||||
|
* @param {'completed'|'started'} field - The field to update.
|
||||||
|
* @param {Date} date - The timestamp of the last seen event.
|
||||||
|
* @returns {Promise<void>}
|
||||||
|
* @description
|
||||||
|
* Updates the `finished_at` or `started_at` column of the specified job in the `jobs` table with the provided timestamp.
|
||||||
|
* This is used to keep track of the last time the job was run to avoid expensive queries following reboot.
|
||||||
|
*/
|
||||||
|
async setJobTimestamp(jobName, field, date) {
|
||||||
|
// Convert string dates to Date objects for SQLite compatibility
|
||||||
|
try {
|
||||||
|
debug(`Setting ${field} timestamp for job ${jobName} to ${date}`);
|
||||||
|
const updateField = field === 'completed' ? 'finished_at' : 'started_at';
|
||||||
|
const status = field === 'completed' ? 'finished' : 'started';
|
||||||
|
const result = await db.knex('jobs').update({[updateField]: date, updated_at: new Date(), status: status}).where('name', jobName);
|
||||||
|
if (result === 0) {
|
||||||
|
await db.knex('jobs').insert({
|
||||||
|
id: new ObjectID().toHexString(),
|
||||||
|
name: jobName,
|
||||||
|
[updateField]: date,
|
||||||
|
updated_at: date,
|
||||||
|
status: status
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
debug(`Error setting ${field} timestamp for job ${jobName}: ${err.message}`);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the status of the specified email analytics job.
|
||||||
|
* @param {EmailAnalyticsJobName} jobName - The name of the job to update.
|
||||||
|
* @param {'started'|'finished'|'failed'} status - The new status of the job.
|
||||||
|
* @returns {Promise<void>}
|
||||||
|
* @description
|
||||||
|
* Updates the `status` column of the specified job in the `jobs` table with the provided status.
|
||||||
|
* This is used to keep track of the current state of the job.
|
||||||
|
*/
|
||||||
|
async setJobStatus(jobName, status) {
|
||||||
|
debug(`Setting status for job ${jobName} to ${status}`);
|
||||||
|
try {
|
||||||
|
const result = await db.knex('jobs')
|
||||||
|
.update({
|
||||||
|
status: status,
|
||||||
|
updated_at: new Date()
|
||||||
|
})
|
||||||
|
.where('name', jobName);
|
||||||
|
|
||||||
|
if (result === 0) {
|
||||||
|
await db.knex('jobs').insert({
|
||||||
|
id: new ObjectID().toHexString(),
|
||||||
|
name: jobName,
|
||||||
|
status: status,
|
||||||
|
created_at: new Date(),
|
||||||
|
updated_at: new Date()
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
debug(`Error setting status for job ${jobName}: ${err.message}`);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
async aggregateEmailStats(emailId) {
|
async aggregateEmailStats(emailId) {
|
||||||
const {totalCount} = await db.knex('emails').select(db.knex.raw('email_count as totalCount')).where('id', emailId).first() || {totalCount: 0};
|
const {totalCount} = await db.knex('emails').select(db.knex.raw('email_count as totalCount')).where('id', emailId).first() || {totalCount: 0};
|
||||||
// use IS NULL here because that will typically match far fewer rows than IS NOT NULL making the query faster
|
// use IS NULL here because that will typically match far fewer rows than IS NOT NULL making the query faster
|
||||||
@ -83,4 +174,4 @@ module.exports = {
|
|||||||
.update(updateQuery)
|
.update(updateQuery)
|
||||||
.where('id', memberId);
|
.where('id', memberId);
|
||||||
}
|
}
|
||||||
};
|
};
|
@ -9,6 +9,7 @@ const errors = require('@tryghost/errors');
|
|||||||
/**
|
/**
|
||||||
* @typedef {object} FetchData
|
* @typedef {object} FetchData
|
||||||
* @property {boolean} running
|
* @property {boolean} running
|
||||||
|
* @property {('email-analytics-latest-others'|'email-analytics-missing'|'email-analytics-latest-opened'|'email-analytics-scheduled')} jobName Name of the job that is running
|
||||||
* @property {Date} [lastStarted] Date the last fetch started on
|
* @property {Date} [lastStarted] Date the last fetch started on
|
||||||
* @property {Date} [lastBegin] The begin time used during the last fetch
|
* @property {Date} [lastBegin] The begin time used during the last fetch
|
||||||
* @property {Date} [lastEventTimestamp]
|
* @property {Date} [lastEventTimestamp]
|
||||||
@ -16,7 +17,11 @@ const errors = require('@tryghost/errors');
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @typedef {FetchData & {schedule: {begin: Date, end: Date}}} FetchDataScheduled
|
* @typedef {FetchData & {schedule?: {begin: Date, end: Date}}} FetchDataScheduled
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @typedef {'delivered' | 'opened' | 'failed' | 'unsubscribed' | 'complained'} EmailAnalyticsEvent
|
||||||
*/
|
*/
|
||||||
|
|
||||||
const TRUST_THRESHOLD_MS = 30 * 60 * 1000; // 30 minutes
|
const TRUST_THRESHOLD_MS = 30 * 60 * 1000; // 30 minutes
|
||||||
@ -32,26 +37,42 @@ module.exports = class EmailAnalyticsService {
|
|||||||
/**
|
/**
|
||||||
* @type {FetchData}
|
* @type {FetchData}
|
||||||
*/
|
*/
|
||||||
#fetchLatestNonOpenedData = null;
|
#fetchLatestNonOpenedData = {
|
||||||
|
running: false,
|
||||||
|
jobName: 'email-analytics-latest-others'
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @type {FetchData}
|
* @type {FetchData}
|
||||||
*/
|
*/
|
||||||
#fetchMissingData = null;
|
#fetchMissingData = {
|
||||||
|
running: false,
|
||||||
|
jobName: 'email-analytics-missing'
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @type {FetchData}
|
* @type {FetchData}
|
||||||
*/
|
*/
|
||||||
#fetchLatestOpenedData = null;
|
#fetchLatestOpenedData = {
|
||||||
|
running: false,
|
||||||
|
jobName: 'email-analytics-latest-opened'
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @type {FetchDataScheduled}
|
* @type {FetchDataScheduled}
|
||||||
*/
|
*/
|
||||||
#fetchScheduledData = null;
|
#fetchScheduledData = {
|
||||||
|
running: false,
|
||||||
|
jobName: 'email-analytics-scheduled'
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {object} dependencies
|
* @param {object} dependencies
|
||||||
|
* @param {object} dependencies.config
|
||||||
|
* @param {object} dependencies.settings
|
||||||
|
* @param {object} dependencies.queries
|
||||||
* @param {EmailEventProcessor} dependencies.eventProcessor
|
* @param {EmailEventProcessor} dependencies.eventProcessor
|
||||||
|
* @param {object} dependencies.providers
|
||||||
*/
|
*/
|
||||||
constructor({config, settings, queries, eventProcessor, providers}) {
|
constructor({config, settings, queries, eventProcessor, providers}) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
@ -71,16 +92,25 @@ module.exports = class EmailAnalyticsService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the timestamp of the last event we processed. Defaults to now minus 30 minutes if we have no data yet.
|
* Returns the timestamp of the last non-opened event we processed. Defaults to now minus 30 minutes if we have no data yet.
|
||||||
*/
|
*/
|
||||||
async getLastNonOpenedEventTimestamp() {
|
async getLastNonOpenedEventTimestamp() {
|
||||||
return this.#fetchLatestNonOpenedData?.lastEventTimestamp ?? (await this.queries.getLastEventTimestamp(['delivered','failed'])) ?? new Date(Date.now() - TRUST_THRESHOLD_MS);
|
return this.#fetchLatestNonOpenedData?.lastEventTimestamp ?? (await this.queries.getLastEventTimestamp(this.#fetchLatestNonOpenedData.jobName,['delivered','failed'])) ?? new Date(Date.now() - TRUST_THRESHOLD_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the timestamp of the last opened event we processed. Defaults to now minus 30 minutes if we have no data yet.
|
||||||
|
*/
|
||||||
async getLastOpenedEventTimestamp() {
|
async getLastOpenedEventTimestamp() {
|
||||||
return this.#fetchLatestOpenedData?.lastEventTimestamp ?? (await this.queries.getLastEventTimestamp(['opened'])) ?? new Date(Date.now() - TRUST_THRESHOLD_MS);
|
return this.#fetchLatestOpenedData?.lastEventTimestamp ?? (await this.queries.getLastEventTimestamp(this.#fetchLatestOpenedData.jobName,['opened'])) ?? new Date(Date.now() - TRUST_THRESHOLD_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetches the latest opened events.
|
||||||
|
* @param {Object} options - The options for fetching events.
|
||||||
|
* @param {number} [options.maxEvents=Infinity] - The maximum number of events to fetch.
|
||||||
|
* @returns {Promise<number>} The total number of events fetched.
|
||||||
|
*/
|
||||||
async fetchLatestOpenedEvents({maxEvents = Infinity} = {}) {
|
async fetchLatestOpenedEvents({maxEvents = Infinity} = {}) {
|
||||||
// Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available
|
// Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available
|
||||||
const begin = await this.getLastOpenedEventTimestamp();
|
const begin = await this.getLastOpenedEventTimestamp();
|
||||||
@ -92,16 +122,15 @@ module.exports = class EmailAnalyticsService {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the fetch data object if it doesn't exist yet
|
|
||||||
if (!this.#fetchLatestOpenedData) {
|
|
||||||
this.#fetchLatestOpenedData = {
|
|
||||||
running: false
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
return await this.#fetchEvents(this.#fetchLatestOpenedData, {begin, end, maxEvents, eventTypes: ['opened']});
|
return await this.#fetchEvents(this.#fetchLatestOpenedData, {begin, end, maxEvents, eventTypes: ['opened']});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetches the latest non-opened events.
|
||||||
|
* @param {Object} options - The options for fetching events.
|
||||||
|
* @param {number} [options.maxEvents=Infinity] - The maximum number of events to fetch.
|
||||||
|
* @returns {Promise<number>} The total number of events fetched.
|
||||||
|
*/
|
||||||
async fetchLatestNonOpenedEvents({maxEvents = Infinity} = {}) {
|
async fetchLatestNonOpenedEvents({maxEvents = Infinity} = {}) {
|
||||||
// Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available
|
// Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available
|
||||||
const begin = await this.getLastNonOpenedEventTimestamp();
|
const begin = await this.getLastNonOpenedEventTimestamp();
|
||||||
@ -113,13 +142,6 @@ module.exports = class EmailAnalyticsService {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the fetch data object if it doesn't exist yet
|
|
||||||
if (!this.#fetchLatestNonOpenedData) {
|
|
||||||
this.#fetchLatestNonOpenedData = {
|
|
||||||
running: false
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
return await this.#fetchEvents(this.#fetchLatestNonOpenedData, {begin, end, maxEvents, eventTypes: ['delivered', 'failed', 'unsubscribed', 'complained']});
|
return await this.#fetchEvents(this.#fetchLatestNonOpenedData, {begin, end, maxEvents, eventTypes: ['delivered', 'failed', 'unsubscribed', 'complained']});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -136,7 +158,7 @@ module.exports = class EmailAnalyticsService {
|
|||||||
const end = new Date(
|
const end = new Date(
|
||||||
Math.min(
|
Math.min(
|
||||||
Date.now() - TRUST_THRESHOLD_MS,
|
Date.now() - TRUST_THRESHOLD_MS,
|
||||||
this.#fetchLatestNonOpenedData?.lastBegin?.getTime()
|
this.#fetchLatestNonOpenedData?.lastBegin?.getTime() || Date.now() // Fallback to now if the previous job didn't run, for whatever reason, prevents catastrophic error
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -146,18 +168,15 @@ module.exports = class EmailAnalyticsService {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the fetch data object if it doesn't exist yet
|
|
||||||
if (!this.#fetchMissingData) {
|
|
||||||
this.#fetchMissingData = {
|
|
||||||
running: false
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
return await this.#fetchEvents(this.#fetchMissingData, {begin, end, maxEvents});
|
return await this.#fetchEvents(this.#fetchMissingData, {begin, end, maxEvents});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Schedule a new fetch that should happen
|
* Schedule a new fetch for email analytics events.
|
||||||
|
* @param {Object} options - The options for scheduling the fetch.
|
||||||
|
* @param {Date} options.begin - The start date for the scheduled fetch.
|
||||||
|
* @param {Date} options.end - The end date for the scheduled fetch.
|
||||||
|
* @throws {errors.ValidationError} Throws an error if a fetch is already in progress.
|
||||||
*/
|
*/
|
||||||
schedule({begin, end}) {
|
schedule({begin, end}) {
|
||||||
if (this.#fetchScheduledData && this.#fetchScheduledData.running) {
|
if (this.#fetchScheduledData && this.#fetchScheduledData.running) {
|
||||||
@ -168,6 +187,7 @@ module.exports = class EmailAnalyticsService {
|
|||||||
logging.info('[EmailAnalytics] Scheduling fetch from ' + begin.toISOString() + ' until ' + end.toISOString());
|
logging.info('[EmailAnalytics] Scheduling fetch from ' + begin.toISOString() + ' until ' + end.toISOString());
|
||||||
this.#fetchScheduledData = {
|
this.#fetchScheduledData = {
|
||||||
running: false,
|
running: false,
|
||||||
|
jobName: 'email-analytics-scheduled',
|
||||||
schedule: {
|
schedule: {
|
||||||
begin,
|
begin,
|
||||||
end
|
end
|
||||||
@ -175,19 +195,32 @@ module.exports = class EmailAnalyticsService {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancels the scheduled fetch of email analytics events.
|
||||||
|
* If a fetch is currently running, it marks it for cancellation.
|
||||||
|
* If no fetch is running, it clears the scheduled fetch data.
|
||||||
|
* @method cancelScheduled
|
||||||
|
*/
|
||||||
cancelScheduled() {
|
cancelScheduled() {
|
||||||
if (this.#fetchScheduledData) {
|
if (this.#fetchScheduledData) {
|
||||||
if (this.#fetchScheduledData.running) {
|
if (this.#fetchScheduledData.running) {
|
||||||
// Cancel the running fetch
|
// Cancel the running fetch
|
||||||
this.#fetchScheduledData.canceled = true;
|
this.#fetchScheduledData.canceled = true;
|
||||||
} else {
|
} else {
|
||||||
this.#fetchScheduledData = null;
|
this.#fetchScheduledData = {
|
||||||
|
running: false,
|
||||||
|
jobName: 'email-analytics-scheduled'
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Continues fetching the scheduled events (does not start one). Resets the scheduled event when received 0 events.
|
* Continues fetching the scheduled events (does not start one). Resets the scheduled event when received 0 events.
|
||||||
|
* @method fetchScheduled
|
||||||
|
* @param {Object} [options] - The options for fetching scheduled events.
|
||||||
|
* @param {number} [options.maxEvents=Infinity] - The maximum number of events to fetch.
|
||||||
|
* @returns {Promise<number>} The number of events fetched.
|
||||||
*/
|
*/
|
||||||
async fetchScheduled({maxEvents = Infinity} = {}) {
|
async fetchScheduled({maxEvents = Infinity} = {}) {
|
||||||
if (!this.#fetchScheduledData || !this.#fetchScheduledData.schedule) {
|
if (!this.#fetchScheduledData || !this.#fetchScheduledData.schedule) {
|
||||||
@ -212,26 +245,34 @@ module.exports = class EmailAnalyticsService {
|
|||||||
if (end <= begin) {
|
if (end <= begin) {
|
||||||
// Skip for now
|
// Skip for now
|
||||||
logging.info('[EmailAnalytics] Ending fetchScheduled because end is before begin');
|
logging.info('[EmailAnalytics] Ending fetchScheduled because end is before begin');
|
||||||
this.#fetchScheduledData = null;
|
this.#fetchScheduledData = {
|
||||||
|
running: false,
|
||||||
|
jobName: 'email-analytics-scheduled'
|
||||||
|
};
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
const count = await this.#fetchEvents(this.#fetchScheduledData, {begin, end, maxEvents});
|
const count = await this.#fetchEvents(this.#fetchScheduledData, {begin, end, maxEvents});
|
||||||
if (count === 0 || this.#fetchScheduledData.canceled) {
|
if (count === 0 || this.#fetchScheduledData.canceled) {
|
||||||
// Reset the scheduled fetch
|
// Reset the scheduled fetch
|
||||||
this.#fetchScheduledData = null;
|
this.#fetchScheduledData = {
|
||||||
|
running: false,
|
||||||
|
jobName: 'email-analytics-scheduled'
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.queries.setJobTimestamp(this.#fetchScheduledData.jobName, 'completed', this.#fetchScheduledData.lastEventTimestamp);
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start fetching analytics and store the data of the progress inside fetchData
|
* Start fetching analytics and store the data of the progress inside fetchData
|
||||||
* @param {FetchData} fetchData
|
* @param {FetchData} fetchData - Object to store the progress of the fetch operation
|
||||||
* @param {object} options
|
* @param {object} options - Options for fetching events
|
||||||
* @param {Date} options.begin
|
* @param {Date} options.begin - Start date for fetching events
|
||||||
* @param {Date} options.end
|
* @param {Date} options.end - End date for fetching events
|
||||||
* @param {number} [options.maxEvents] Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks.
|
* @param {number} [options.maxEvents=Infinity] - Maximum number of events to fetch. Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks.
|
||||||
* @param {String[]} [options.eventTypes] Only fetch these events, ['delivered', 'opened', 'failed', 'unsubscribed', 'complained']
|
* @param {EmailAnalyticsEvent[]} [options.eventTypes] - Array of event types to fetch. If not provided, Mailgun will return all event types.
|
||||||
|
* @returns {Promise<number>} The number of events fetched
|
||||||
*/
|
*/
|
||||||
async #fetchEvents(fetchData, {begin, end, maxEvents = Infinity, eventTypes = null}) {
|
async #fetchEvents(fetchData, {begin, end, maxEvents = Infinity, eventTypes = null}) {
|
||||||
// Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available
|
// Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available
|
||||||
@ -241,6 +282,7 @@ module.exports = class EmailAnalyticsService {
|
|||||||
fetchData.running = true;
|
fetchData.running = true;
|
||||||
fetchData.lastStarted = new Date();
|
fetchData.lastStarted = new Date();
|
||||||
fetchData.lastBegin = begin;
|
fetchData.lastBegin = begin;
|
||||||
|
this.queries.setJobTimestamp(fetchData.jobName, 'started', begin);
|
||||||
|
|
||||||
let lastAggregation = Date.now();
|
let lastAggregation = Date.now();
|
||||||
let eventCount = 0;
|
let eventCount = 0;
|
||||||
@ -249,6 +291,13 @@ module.exports = class EmailAnalyticsService {
|
|||||||
let processingResult = new EventProcessingResult();
|
let processingResult = new EventProcessingResult();
|
||||||
let error = null;
|
let error = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process a batch of events
|
||||||
|
* @param {Array<Object>} events - Array of event objects to process
|
||||||
|
* @param {EventProcessingResult} processingResult - Object to store the processing results
|
||||||
|
* @param {FetchData} fetchData - Object containing fetch operation data
|
||||||
|
* @returns {Promise<void>}
|
||||||
|
*/
|
||||||
const processBatch = async (events) => {
|
const processBatch = async (events) => {
|
||||||
// Even if the fetching is interrupted because of an error, we still store the last event timestamp
|
// Even if the fetching is interrupted because of an error, we still store the last event timestamp
|
||||||
await this.processEventBatch(events, processingResult, fetchData);
|
await this.processEventBatch(events, processingResult, fetchData);
|
||||||
@ -309,7 +358,14 @@ module.exports = class EmailAnalyticsService {
|
|||||||
// So if we didn't have errors while fetching, and total events < maxEvents, increase lastEventTimestamp with one second
|
// So if we didn't have errors while fetching, and total events < maxEvents, increase lastEventTimestamp with one second
|
||||||
if (!error && eventCount > 0 && eventCount < maxEvents && fetchData.lastEventTimestamp && fetchData.lastEventTimestamp.getTime() < Date.now() - 2000) {
|
if (!error && eventCount > 0 && eventCount < maxEvents && fetchData.lastEventTimestamp && fetchData.lastEventTimestamp.getTime() < Date.now() - 2000) {
|
||||||
logging.info('[EmailAnalytics] Reached end of new events, increasing lastEventTimestamp with one second');
|
logging.info('[EmailAnalytics] Reached end of new events, increasing lastEventTimestamp with one second');
|
||||||
|
// set the data on the db so we can store it for fetching after reboot
|
||||||
|
await this.queries.setJobTimestamp(fetchData.jobName, 'completed', new Date(fetchData.lastEventTimestamp.getTime()));
|
||||||
|
// increment and store in local memory
|
||||||
fetchData.lastEventTimestamp = new Date(fetchData.lastEventTimestamp.getTime() + 1000);
|
fetchData.lastEventTimestamp = new Date(fetchData.lastEventTimestamp.getTime() + 1000);
|
||||||
|
} else {
|
||||||
|
logging.info('[EmailAnalytics] No new events found');
|
||||||
|
// set job status to finished
|
||||||
|
await this.queries.setJobStatus(fetchData.jobName, 'completed');
|
||||||
}
|
}
|
||||||
|
|
||||||
fetchData.running = false;
|
fetchData.running = false;
|
||||||
@ -321,8 +377,11 @@ module.exports = class EmailAnalyticsService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {any[]} events
|
* Process a batch of email analytics events.
|
||||||
* @param {FetchData} fetchData
|
* @param {any[]} events - An array of email analytics events to process.
|
||||||
|
* @param {Object} result - The result object to merge batch processing results into.
|
||||||
|
* @param {FetchData} fetchData - Data related to the current fetch operation.
|
||||||
|
* @returns {Promise<void>}
|
||||||
*/
|
*/
|
||||||
async processEventBatch(events, result, fetchData) {
|
async processEventBatch(events, result, fetchData) {
|
||||||
const processStart = Date.now();
|
const processStart = Date.now();
|
||||||
@ -331,7 +390,7 @@ module.exports = class EmailAnalyticsService {
|
|||||||
|
|
||||||
// Save last event timestamp
|
// Save last event timestamp
|
||||||
if (!fetchData.lastEventTimestamp || (event.timestamp && event.timestamp > fetchData.lastEventTimestamp)) {
|
if (!fetchData.lastEventTimestamp || (event.timestamp && event.timestamp > fetchData.lastEventTimestamp)) {
|
||||||
fetchData.lastEventTimestamp = event.timestamp;
|
fetchData.lastEventTimestamp = event.timestamp; // don't need to keep db in sync; it'll fall back to last completed timestamp anyways
|
||||||
}
|
}
|
||||||
|
|
||||||
result.merge(batchResult);
|
result.merge(batchResult);
|
||||||
@ -437,8 +496,10 @@ module.exports = class EmailAnalyticsService {
|
|||||||
return new EventProcessingResult({unhandled: 1});
|
return new EventProcessingResult({unhandled: 1});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {{emailIds?: string[], memberIds?: string[]}} stats
|
||||||
|
*/
|
||||||
async aggregateStats({emailIds = [], memberIds = []}) {
|
async aggregateStats({emailIds = [], memberIds = []}) {
|
||||||
logging.info(`[EmailAnalytics] Aggregating for ${emailIds.length} emails`);
|
|
||||||
for (const emailId of emailIds) {
|
for (const emailId of emailIds) {
|
||||||
await this.aggregateEmailStats(emailId);
|
await this.aggregateEmailStats(emailId);
|
||||||
}
|
}
|
||||||
@ -449,10 +510,20 @@ module.exports = class EmailAnalyticsService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Aggregate email stats for a given email ID.
|
||||||
|
* @param {string} emailId - The ID of the email to aggregate stats for.
|
||||||
|
* @returns {Promise<void>}
|
||||||
|
*/
|
||||||
async aggregateEmailStats(emailId) {
|
async aggregateEmailStats(emailId) {
|
||||||
return this.queries.aggregateEmailStats(emailId);
|
return this.queries.aggregateEmailStats(emailId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Aggregate member stats for a given member ID.
|
||||||
|
* @param {string} memberId - The ID of the member to aggregate stats for.
|
||||||
|
* @returns {Promise<void>}
|
||||||
|
*/
|
||||||
async aggregateMemberStats(memberId) {
|
async aggregateMemberStats(memberId) {
|
||||||
return this.queries.aggregateMemberStats(memberId);
|
return this.queries.aggregateMemberStats(memberId);
|
||||||
}
|
}
|
||||||
|
@ -16,10 +16,22 @@ describe('EmailAnalyticsService', function () {
|
|||||||
const service = new EmailAnalyticsService({});
|
const service = new EmailAnalyticsService({});
|
||||||
const result = service.getStatus();
|
const result = service.getStatus();
|
||||||
result.should.deepEqual({
|
result.should.deepEqual({
|
||||||
latest: null,
|
latest: {
|
||||||
missing: null,
|
jobName: 'email-analytics-latest-others',
|
||||||
scheduled: null,
|
running: false
|
||||||
latestOpened: null
|
},
|
||||||
|
latestOpened: {
|
||||||
|
jobName: 'email-analytics-latest-opened',
|
||||||
|
running: false
|
||||||
|
},
|
||||||
|
missing: {
|
||||||
|
jobName: 'email-analytics-missing',
|
||||||
|
running: false
|
||||||
|
},
|
||||||
|
scheduled: {
|
||||||
|
jobName: 'email-analytics-scheduled',
|
||||||
|
running: false
|
||||||
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@ -84,7 +96,9 @@ describe('EmailAnalyticsService', function () {
|
|||||||
const fetchLatestSpy = sinon.spy();
|
const fetchLatestSpy = sinon.spy();
|
||||||
const service = new EmailAnalyticsService({
|
const service = new EmailAnalyticsService({
|
||||||
queries: {
|
queries: {
|
||||||
getLastEventTimestamp: sinon.stub().resolves()
|
getLastEventTimestamp: sinon.stub().resolves(),
|
||||||
|
setJobTimestamp: sinon.stub().resolves(),
|
||||||
|
setJobStatus: sinon.stub().resolves()
|
||||||
},
|
},
|
||||||
providers: [{
|
providers: [{
|
||||||
fetchLatest: fetchLatestSpy
|
fetchLatest: fetchLatestSpy
|
||||||
@ -94,13 +108,32 @@ describe('EmailAnalyticsService', function () {
|
|||||||
fetchLatestSpy.calledOnce.should.be.true();
|
fetchLatestSpy.calledOnce.should.be.true();
|
||||||
fetchLatestSpy.getCall(0).args[1].should.have.property('events', ['opened']);
|
fetchLatestSpy.getCall(0).args[1].should.have.property('events', ['opened']);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('quits if the end is before the begin', async function () {
|
||||||
|
const fetchLatestSpy = sinon.spy();
|
||||||
|
const service = new EmailAnalyticsService({
|
||||||
|
queries: {
|
||||||
|
getLastEventTimestamp: sinon.stub().resolves(new Date(Date.now() + 24 * 60 * 60 * 1000)), // 24 hours in the future
|
||||||
|
setJobTimestamp: sinon.stub().resolves(),
|
||||||
|
setJobStatus: sinon.stub().resolves()
|
||||||
|
},
|
||||||
|
providers: [{
|
||||||
|
fetchLatest: fetchLatestSpy
|
||||||
|
}]
|
||||||
|
});
|
||||||
|
await service.fetchLatestOpenedEvents();
|
||||||
|
fetchLatestSpy.calledOnce.should.be.false();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('fetchLatestNonOpenedEvents', function () {
|
describe('fetchLatestNonOpenedEvents', function () {
|
||||||
it('fetches only non-opened events', async function () {
|
it('fetches only non-opened events', async function () {
|
||||||
const fetchLatestSpy = sinon.spy();
|
const fetchLatestSpy = sinon.spy();
|
||||||
const service = new EmailAnalyticsService({
|
const service = new EmailAnalyticsService({
|
||||||
queries: {
|
queries: {
|
||||||
getLastEventTimestamp: sinon.stub().resolves()
|
getLastEventTimestamp: sinon.stub().resolves(),
|
||||||
|
setJobTimestamp: sinon.stub().resolves(),
|
||||||
|
setJobStatus: sinon.stub().resolves()
|
||||||
},
|
},
|
||||||
providers: [{
|
providers: [{
|
||||||
fetchLatest: fetchLatestSpy
|
fetchLatest: fetchLatestSpy
|
||||||
@ -110,293 +143,545 @@ describe('EmailAnalyticsService', function () {
|
|||||||
fetchLatestSpy.calledOnce.should.be.true();
|
fetchLatestSpy.calledOnce.should.be.true();
|
||||||
fetchLatestSpy.getCall(0).args[1].should.have.property('events', ['delivered', 'failed', 'unsubscribed', 'complained']);
|
fetchLatestSpy.getCall(0).args[1].should.have.property('events', ['delivered', 'failed', 'unsubscribed', 'complained']);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('quits if the end is before the begin', async function () {
|
||||||
|
const fetchLatestSpy = sinon.spy();
|
||||||
|
const service = new EmailAnalyticsService({
|
||||||
|
queries: {
|
||||||
|
getLastEventTimestamp: sinon.stub().resolves(new Date(Date.now() + 24 * 60 * 60 * 1000)), // 24 hours in the future
|
||||||
|
setJobTimestamp: sinon.stub().resolves(),
|
||||||
|
setJobStatus: sinon.stub().resolves()
|
||||||
|
},
|
||||||
|
providers: [{
|
||||||
|
fetchLatest: fetchLatestSpy
|
||||||
|
}]
|
||||||
|
});
|
||||||
|
await service.fetchLatestNonOpenedEvents();
|
||||||
|
fetchLatestSpy.calledOnce.should.be.false();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
describe('fetchScheduled', function () {
|
||||||
|
let service;
|
||||||
|
let processEventBatchStub;
|
||||||
|
let aggregateStatsStub;
|
||||||
|
|
||||||
|
beforeEach(function () {
|
||||||
|
service = new EmailAnalyticsService({
|
||||||
|
queries: {
|
||||||
|
setJobTimestamp: sinon.stub().resolves(),
|
||||||
|
setJobStatus: sinon.stub().resolves()
|
||||||
|
},
|
||||||
|
providers: [{
|
||||||
|
fetchLatest: (fn) => {
|
||||||
|
const events = [1,2,3,4,5,6,7,8,9,10];
|
||||||
|
fn(events);
|
||||||
|
}
|
||||||
|
}]
|
||||||
|
});
|
||||||
|
processEventBatchStub = sinon.stub(service, 'processEventBatch').resolves();
|
||||||
|
aggregateStatsStub = sinon.stub(service, 'aggregateStats').resolves();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(function () {
|
||||||
|
sinon.restore();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns 0 when nothing is scheduled', async function () {
|
||||||
|
const result = await service.fetchScheduled();
|
||||||
|
result.should.equal(0);
|
||||||
|
processEventBatchStub.called.should.be.false();
|
||||||
|
aggregateStatsStub.called.should.be.false();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns 0 when fetch is canceled', async function () {
|
||||||
|
service.schedule({
|
||||||
|
begin: new Date(2023, 0, 1),
|
||||||
|
end: new Date(2023, 0, 2)
|
||||||
|
});
|
||||||
|
service.cancelScheduled();
|
||||||
|
const result = await service.fetchScheduled();
|
||||||
|
result.should.equal(0);
|
||||||
|
processEventBatchStub.called.should.be.false();
|
||||||
|
aggregateStatsStub.called.should.be.false();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('fetches events with correct parameters', async function () {
|
||||||
|
service.schedule({
|
||||||
|
begin: new Date(2023, 0, 1),
|
||||||
|
end: new Date(2023, 0, 2)
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = await service.fetchScheduled({maxEvents: 100});
|
||||||
|
|
||||||
|
result.should.equal(10);
|
||||||
|
aggregateStatsStub.calledOnce.should.be.true();
|
||||||
|
processEventBatchStub.calledOnce.should.be.true();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('bails when end date is before begin date', async function () {
|
||||||
|
service.schedule({
|
||||||
|
begin: new Date(2023, 0, 2),
|
||||||
|
end: new Date(2023, 0, 1)
|
||||||
|
});
|
||||||
|
const result = await service.fetchScheduled({maxEvents: 100});
|
||||||
|
result.should.equal(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('resets fetchScheduledData when no events are fetched', async function () {
|
||||||
|
service = new EmailAnalyticsService({
|
||||||
|
queries: {
|
||||||
|
setJobTimestamp: sinon.stub().resolves(),
|
||||||
|
setJobStatus: sinon.stub().resolves()
|
||||||
|
},
|
||||||
|
providers: [{
|
||||||
|
fetchLatest: (fn) => {
|
||||||
|
fn([]);
|
||||||
|
}
|
||||||
|
}]
|
||||||
|
});
|
||||||
|
|
||||||
|
service.schedule({
|
||||||
|
begin: new Date(2023, 0, 1),
|
||||||
|
end: new Date(2023, 0, 2)
|
||||||
|
});
|
||||||
|
const result = await service.fetchScheduled({maxEvents: 100});
|
||||||
|
result.should.equal(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('fetchMissing', function () {
|
||||||
|
it('fetches missing events', async function () {
|
||||||
|
const fetchLatestSpy = sinon.spy();
|
||||||
|
const service = new EmailAnalyticsService({
|
||||||
|
queries: {
|
||||||
|
setJobTimestamp: sinon.stub().resolves(),
|
||||||
|
setJobStatus: sinon.stub().resolves()
|
||||||
|
},
|
||||||
|
providers: [{
|
||||||
|
fetchLatest: fetchLatestSpy
|
||||||
|
}]
|
||||||
|
});
|
||||||
|
await service.fetchMissing();
|
||||||
|
fetchLatestSpy.calledOnce.should.be.true();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('processEventBatch', function () {
|
describe('processEventBatch', function () {
|
||||||
let eventProcessor;
|
describe('with functional processor', function () {
|
||||||
beforeEach(function () {
|
let eventProcessor;
|
||||||
eventProcessor = {};
|
beforeEach(function () {
|
||||||
eventProcessor.handleDelivered = sinon.stub().callsFake(({emailId}) => {
|
eventProcessor = {};
|
||||||
return {
|
eventProcessor.handleDelivered = sinon.stub().callsFake(({emailId}) => {
|
||||||
emailId,
|
return {
|
||||||
emailRecipientId: emailId,
|
emailId,
|
||||||
memberId: 1
|
emailRecipientId: emailId,
|
||||||
};
|
memberId: 1
|
||||||
|
};
|
||||||
|
});
|
||||||
|
eventProcessor.handleOpened = sinon.stub().callsFake(({emailId}) => {
|
||||||
|
return {
|
||||||
|
emailId,
|
||||||
|
emailRecipientId: emailId,
|
||||||
|
memberId: 1
|
||||||
|
};
|
||||||
|
});
|
||||||
|
eventProcessor.handlePermanentFailed = sinon.stub().callsFake(({emailId}) => {
|
||||||
|
return {
|
||||||
|
emailId,
|
||||||
|
emailRecipientId: emailId,
|
||||||
|
memberId: 1
|
||||||
|
};
|
||||||
|
});
|
||||||
|
eventProcessor.handleTemporaryFailed = sinon.stub().callsFake(({emailId}) => {
|
||||||
|
return {
|
||||||
|
emailId,
|
||||||
|
emailRecipientId: emailId,
|
||||||
|
memberId: 1
|
||||||
|
};
|
||||||
|
});
|
||||||
|
eventProcessor.handleUnsubscribed = sinon.stub().callsFake(({emailId}) => {
|
||||||
|
return {
|
||||||
|
emailId,
|
||||||
|
emailRecipientId: emailId,
|
||||||
|
memberId: 1
|
||||||
|
};
|
||||||
|
});
|
||||||
|
eventProcessor.handleComplained = sinon.stub().callsFake(({emailId}) => {
|
||||||
|
return {
|
||||||
|
emailId,
|
||||||
|
emailRecipientId: emailId,
|
||||||
|
memberId: 1
|
||||||
|
};
|
||||||
|
});
|
||||||
});
|
});
|
||||||
eventProcessor.handleOpened = sinon.stub().callsFake(({emailId}) => {
|
|
||||||
return {
|
it('uses passed-in event processor', async function () {
|
||||||
emailId,
|
const service = new EmailAnalyticsService({
|
||||||
emailRecipientId: emailId,
|
eventProcessor
|
||||||
memberId: 1
|
});
|
||||||
};
|
|
||||||
});
|
const result = new EventProcessingResult();
|
||||||
eventProcessor.handlePermanentFailed = sinon.stub().callsFake(({emailId}) => {
|
const fetchData = {};
|
||||||
return {
|
await service.processEventBatch([{
|
||||||
emailId,
|
type: 'delivered',
|
||||||
emailRecipientId: emailId,
|
emailId: 1,
|
||||||
memberId: 1
|
timestamp: new Date(1)
|
||||||
};
|
}, {
|
||||||
});
|
type: 'delivered',
|
||||||
eventProcessor.handleTemporaryFailed = sinon.stub().callsFake(({emailId}) => {
|
emailId: 2,
|
||||||
return {
|
timestamp: new Date(2)
|
||||||
emailId,
|
}, {
|
||||||
emailRecipientId: emailId,
|
type: 'opened',
|
||||||
memberId: 1
|
emailId: 1,
|
||||||
};
|
timestamp: new Date(3)
|
||||||
});
|
}], result, fetchData);
|
||||||
eventProcessor.handleUnsubscribed = sinon.stub().callsFake(({emailId}) => {
|
|
||||||
return {
|
eventProcessor.handleDelivered.callCount.should.eql(2);
|
||||||
emailId,
|
eventProcessor.handleOpened.callCount.should.eql(1);
|
||||||
emailRecipientId: emailId,
|
|
||||||
memberId: 1
|
result.should.deepEqual(new EventProcessingResult({
|
||||||
};
|
delivered: 2,
|
||||||
});
|
opened: 1,
|
||||||
eventProcessor.handleComplained = sinon.stub().callsFake(({emailId}) => {
|
unprocessable: 0,
|
||||||
return {
|
emailIds: [1, 2],
|
||||||
emailId,
|
memberIds: [1]
|
||||||
emailRecipientId: emailId,
|
}));
|
||||||
memberId: 1
|
|
||||||
};
|
fetchData.should.deepEqual({
|
||||||
});
|
lastEventTimestamp: new Date(3)
|
||||||
});
|
});
|
||||||
|
|
||||||
it('uses passed-in event processor', async function () {
|
|
||||||
const service = new EmailAnalyticsService({
|
|
||||||
eventProcessor
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const result = new EventProcessingResult();
|
it('handles opened', async function () {
|
||||||
const fetchData = {};
|
const service = new EmailAnalyticsService({
|
||||||
await service.processEventBatch([{
|
eventProcessor
|
||||||
type: 'delivered',
|
});
|
||||||
emailId: 1,
|
|
||||||
timestamp: new Date(1)
|
|
||||||
}, {
|
|
||||||
type: 'delivered',
|
|
||||||
emailId: 2,
|
|
||||||
timestamp: new Date(2)
|
|
||||||
}, {
|
|
||||||
type: 'opened',
|
|
||||||
emailId: 1,
|
|
||||||
timestamp: new Date(3)
|
|
||||||
}], result, fetchData);
|
|
||||||
|
|
||||||
eventProcessor.handleDelivered.callCount.should.eql(2);
|
const result = new EventProcessingResult();
|
||||||
eventProcessor.handleOpened.callCount.should.eql(1);
|
const fetchData = {};
|
||||||
|
|
||||||
result.should.deepEqual(new EventProcessingResult({
|
await service.processEventBatch([{
|
||||||
delivered: 2,
|
type: 'opened',
|
||||||
opened: 1,
|
emailId: 1,
|
||||||
unprocessable: 0,
|
timestamp: new Date(1)
|
||||||
emailIds: [1, 2],
|
}], result, fetchData);
|
||||||
memberIds: [1]
|
|
||||||
}));
|
|
||||||
|
|
||||||
fetchData.should.deepEqual({
|
eventProcessor.handleOpened.calledOnce.should.be.true();
|
||||||
lastEventTimestamp: new Date(3)
|
|
||||||
|
result.should.deepEqual(new EventProcessingResult({
|
||||||
|
delivered: 0,
|
||||||
|
opened: 1,
|
||||||
|
unprocessable: 0,
|
||||||
|
emailIds: [1],
|
||||||
|
memberIds: [1]
|
||||||
|
}));
|
||||||
|
|
||||||
|
fetchData.should.deepEqual({
|
||||||
|
lastEventTimestamp: new Date(1)
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('handles delivered', async function () {
|
||||||
|
const service = new EmailAnalyticsService({
|
||||||
|
eventProcessor
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = new EventProcessingResult();
|
||||||
|
const fetchData = {};
|
||||||
|
|
||||||
|
await service.processEventBatch([{
|
||||||
|
type: 'delivered',
|
||||||
|
emailId: 1,
|
||||||
|
timestamp: new Date(1)
|
||||||
|
}], result, fetchData);
|
||||||
|
|
||||||
|
eventProcessor.handleDelivered.calledOnce.should.be.true();
|
||||||
|
|
||||||
|
result.should.deepEqual(new EventProcessingResult({
|
||||||
|
delivered: 1,
|
||||||
|
opened: 0,
|
||||||
|
unprocessable: 0,
|
||||||
|
emailIds: [1],
|
||||||
|
memberIds: [1]
|
||||||
|
}));
|
||||||
|
|
||||||
|
fetchData.should.deepEqual({
|
||||||
|
lastEventTimestamp: new Date(1)
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('handles failed (permanent)', async function () {
|
||||||
|
const service = new EmailAnalyticsService({
|
||||||
|
eventProcessor
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = new EventProcessingResult();
|
||||||
|
const fetchData = {};
|
||||||
|
|
||||||
|
await service.processEventBatch([{
|
||||||
|
type: 'failed',
|
||||||
|
severity: 'permanent',
|
||||||
|
emailId: 1,
|
||||||
|
timestamp: new Date(1)
|
||||||
|
}], result, fetchData);
|
||||||
|
|
||||||
|
eventProcessor.handlePermanentFailed.calledOnce.should.be.true();
|
||||||
|
|
||||||
|
result.should.deepEqual(new EventProcessingResult({
|
||||||
|
permanentFailed: 1,
|
||||||
|
emailIds: [1],
|
||||||
|
memberIds: [1]
|
||||||
|
}));
|
||||||
|
|
||||||
|
fetchData.should.deepEqual({
|
||||||
|
lastEventTimestamp: new Date(1)
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('handles failed (temporary)', async function () {
|
||||||
|
const service = new EmailAnalyticsService({
|
||||||
|
eventProcessor
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = new EventProcessingResult();
|
||||||
|
const fetchData = {};
|
||||||
|
|
||||||
|
await service.processEventBatch([{
|
||||||
|
type: 'failed',
|
||||||
|
severity: 'temporary',
|
||||||
|
emailId: 1,
|
||||||
|
timestamp: new Date(1)
|
||||||
|
}], result, fetchData);
|
||||||
|
|
||||||
|
eventProcessor.handleTemporaryFailed.calledOnce.should.be.true();
|
||||||
|
|
||||||
|
result.should.deepEqual(new EventProcessingResult({
|
||||||
|
temporaryFailed: 1,
|
||||||
|
emailIds: [1],
|
||||||
|
memberIds: [1]
|
||||||
|
}));
|
||||||
|
|
||||||
|
fetchData.should.deepEqual({
|
||||||
|
lastEventTimestamp: new Date(1)
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('handles unsubscribed', async function () {
|
||||||
|
const service = new EmailAnalyticsService({
|
||||||
|
eventProcessor
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = new EventProcessingResult();
|
||||||
|
const fetchData = {};
|
||||||
|
|
||||||
|
await service.processEventBatch([{
|
||||||
|
type: 'unsubscribed',
|
||||||
|
emailId: 1,
|
||||||
|
timestamp: new Date(1)
|
||||||
|
}], result, fetchData);
|
||||||
|
|
||||||
|
eventProcessor.handleUnsubscribed.calledOnce.should.be.true();
|
||||||
|
eventProcessor.handleDelivered.called.should.be.false();
|
||||||
|
eventProcessor.handleOpened.called.should.be.false();
|
||||||
|
|
||||||
|
result.should.deepEqual(new EventProcessingResult({
|
||||||
|
unsubscribed: 1,
|
||||||
|
emailIds: [1],
|
||||||
|
memberIds: [1]
|
||||||
|
}));
|
||||||
|
|
||||||
|
fetchData.should.deepEqual({
|
||||||
|
lastEventTimestamp: new Date(1)
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('handles complained', async function () {
|
||||||
|
const service = new EmailAnalyticsService({
|
||||||
|
eventProcessor
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = new EventProcessingResult();
|
||||||
|
const fetchData = {};
|
||||||
|
|
||||||
|
await service.processEventBatch([{
|
||||||
|
type: 'complained',
|
||||||
|
emailId: 1,
|
||||||
|
timestamp: new Date(1)
|
||||||
|
}], result, fetchData);
|
||||||
|
|
||||||
|
eventProcessor.handleComplained.calledOnce.should.be.true();
|
||||||
|
eventProcessor.handleDelivered.called.should.be.false();
|
||||||
|
eventProcessor.handleOpened.called.should.be.false();
|
||||||
|
|
||||||
|
result.should.deepEqual(new EventProcessingResult({
|
||||||
|
complained: 1,
|
||||||
|
emailIds: [1],
|
||||||
|
memberIds: [1]
|
||||||
|
}));
|
||||||
|
|
||||||
|
fetchData.should.deepEqual({
|
||||||
|
lastEventTimestamp: new Date(1)
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it(`doens't handle other event types`, async function () {
|
||||||
|
const service = new EmailAnalyticsService({
|
||||||
|
eventProcessor
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = new EventProcessingResult();
|
||||||
|
const fetchData = {};
|
||||||
|
|
||||||
|
await service.processEventBatch([{
|
||||||
|
type: 'notstandard',
|
||||||
|
emailId: 1,
|
||||||
|
timestamp: new Date(1)
|
||||||
|
}], result, fetchData);
|
||||||
|
|
||||||
|
eventProcessor.handleDelivered.called.should.be.false();
|
||||||
|
eventProcessor.handleOpened.called.should.be.false();
|
||||||
|
|
||||||
|
result.should.deepEqual(new EventProcessingResult({
|
||||||
|
unhandled: 1
|
||||||
|
}));
|
||||||
|
|
||||||
|
fetchData.should.deepEqual({
|
||||||
|
lastEventTimestamp: new Date(1)
|
||||||
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('handles opened', async function () {
|
describe('with null processor results', function () {
|
||||||
const service = new EmailAnalyticsService({
|
let eventProcessor;
|
||||||
eventProcessor
|
beforeEach(function () {
|
||||||
|
eventProcessor = {};
|
||||||
|
eventProcessor.handleDelivered = sinon.stub().returns(null);
|
||||||
|
eventProcessor.handleOpened = sinon.stub().returns(null);
|
||||||
|
eventProcessor.handlePermanentFailed = sinon.stub().returns(null);
|
||||||
|
eventProcessor.handleTemporaryFailed = sinon.stub().returns(null);
|
||||||
|
eventProcessor.handleUnsubscribed = sinon.stub().returns(null);
|
||||||
|
eventProcessor.handleComplained = sinon.stub().returns(null);
|
||||||
});
|
});
|
||||||
|
|
||||||
const result = new EventProcessingResult();
|
it('delivered returns unprocessable', async function () {
|
||||||
const fetchData = {};
|
const service = new EmailAnalyticsService({
|
||||||
|
eventProcessor
|
||||||
|
});
|
||||||
|
|
||||||
await service.processEventBatch([{
|
const result = new EventProcessingResult();
|
||||||
type: 'opened',
|
const fetchData = {};
|
||||||
emailId: 1,
|
|
||||||
timestamp: new Date(1)
|
|
||||||
}], result, fetchData);
|
|
||||||
|
|
||||||
eventProcessor.handleOpened.calledOnce.should.be.true();
|
await service.processEventBatch([{
|
||||||
|
type: 'delivered',
|
||||||
|
emailId: 1,
|
||||||
|
timestamp: new Date(1)
|
||||||
|
}], result, fetchData);
|
||||||
|
|
||||||
result.should.deepEqual(new EventProcessingResult({
|
result.should.deepEqual(new EventProcessingResult({
|
||||||
delivered: 0,
|
unprocessable: 1
|
||||||
opened: 1,
|
}));
|
||||||
unprocessable: 0,
|
|
||||||
emailIds: [1],
|
|
||||||
memberIds: [1]
|
|
||||||
}));
|
|
||||||
|
|
||||||
fetchData.should.deepEqual({
|
|
||||||
lastEventTimestamp: new Date(1)
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
it('handles delivered', async function () {
|
|
||||||
const service = new EmailAnalyticsService({
|
|
||||||
eventProcessor
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const result = new EventProcessingResult();
|
it('opened returns unprocessable', async function () {
|
||||||
const fetchData = {};
|
const service = new EmailAnalyticsService({
|
||||||
|
eventProcessor
|
||||||
|
});
|
||||||
|
|
||||||
await service.processEventBatch([{
|
const result = new EventProcessingResult();
|
||||||
type: 'delivered',
|
const fetchData = {};
|
||||||
emailId: 1,
|
|
||||||
timestamp: new Date(1)
|
|
||||||
}], result, fetchData);
|
|
||||||
|
|
||||||
eventProcessor.handleDelivered.calledOnce.should.be.true();
|
await service.processEventBatch([{
|
||||||
|
type: 'opened',
|
||||||
|
emailId: 1,
|
||||||
|
timestamp: new Date(1)
|
||||||
|
}], result, fetchData);
|
||||||
|
|
||||||
result.should.deepEqual(new EventProcessingResult({
|
result.should.deepEqual(new EventProcessingResult({
|
||||||
delivered: 1,
|
unprocessable: 1
|
||||||
opened: 0,
|
}));
|
||||||
unprocessable: 0,
|
|
||||||
emailIds: [1],
|
|
||||||
memberIds: [1]
|
|
||||||
}));
|
|
||||||
|
|
||||||
fetchData.should.deepEqual({
|
|
||||||
lastEventTimestamp: new Date(1)
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
it('handles failed (permanent)', async function () {
|
|
||||||
const service = new EmailAnalyticsService({
|
|
||||||
eventProcessor
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const result = new EventProcessingResult();
|
it('failed (permanent) returns unprocessable', async function () {
|
||||||
const fetchData = {};
|
const service = new EmailAnalyticsService({
|
||||||
|
eventProcessor
|
||||||
|
});
|
||||||
|
|
||||||
await service.processEventBatch([{
|
const result = new EventProcessingResult();
|
||||||
type: 'failed',
|
const fetchData = {};
|
||||||
severity: 'permanent',
|
|
||||||
emailId: 1,
|
|
||||||
timestamp: new Date(1)
|
|
||||||
}], result, fetchData);
|
|
||||||
|
|
||||||
eventProcessor.handlePermanentFailed.calledOnce.should.be.true();
|
await service.processEventBatch([{
|
||||||
|
type: 'failed',
|
||||||
|
emailId: 1,
|
||||||
|
timestamp: new Date(1),
|
||||||
|
severity: 'permanent'
|
||||||
|
}], result, fetchData);
|
||||||
|
|
||||||
result.should.deepEqual(new EventProcessingResult({
|
result.should.deepEqual(new EventProcessingResult({
|
||||||
permanentFailed: 1,
|
unprocessable: 1
|
||||||
emailIds: [1],
|
}));
|
||||||
memberIds: [1]
|
|
||||||
}));
|
|
||||||
|
|
||||||
fetchData.should.deepEqual({
|
|
||||||
lastEventTimestamp: new Date(1)
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
it('handles failed (temporary)', async function () {
|
|
||||||
const service = new EmailAnalyticsService({
|
|
||||||
eventProcessor
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const result = new EventProcessingResult();
|
it('failed (temporary) returns unprocessable', async function () {
|
||||||
const fetchData = {};
|
const service = new EmailAnalyticsService({
|
||||||
|
eventProcessor
|
||||||
|
});
|
||||||
|
|
||||||
await service.processEventBatch([{
|
const result = new EventProcessingResult();
|
||||||
type: 'failed',
|
const fetchData = {};
|
||||||
severity: 'temporary',
|
|
||||||
emailId: 1,
|
|
||||||
timestamp: new Date(1)
|
|
||||||
}], result, fetchData);
|
|
||||||
|
|
||||||
eventProcessor.handleTemporaryFailed.calledOnce.should.be.true();
|
await service.processEventBatch([{
|
||||||
|
type: 'failed',
|
||||||
|
emailId: 1,
|
||||||
|
timestamp: new Date(1),
|
||||||
|
severity: 'temporary'
|
||||||
|
}], result, fetchData);
|
||||||
|
|
||||||
result.should.deepEqual(new EventProcessingResult({
|
result.should.deepEqual(new EventProcessingResult({
|
||||||
temporaryFailed: 1,
|
unprocessable: 1
|
||||||
emailIds: [1],
|
}));
|
||||||
memberIds: [1]
|
|
||||||
}));
|
|
||||||
|
|
||||||
fetchData.should.deepEqual({
|
|
||||||
lastEventTimestamp: new Date(1)
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
it('handles unsubscribed', async function () {
|
|
||||||
const service = new EmailAnalyticsService({
|
|
||||||
eventProcessor
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const result = new EventProcessingResult();
|
it('unsubscribed returns unprocessable', async function () {
|
||||||
const fetchData = {};
|
const service = new EmailAnalyticsService({
|
||||||
|
eventProcessor
|
||||||
|
});
|
||||||
|
|
||||||
await service.processEventBatch([{
|
const result = new EventProcessingResult();
|
||||||
type: 'unsubscribed',
|
const fetchData = {};
|
||||||
emailId: 1,
|
|
||||||
timestamp: new Date(1)
|
|
||||||
}], result, fetchData);
|
|
||||||
|
|
||||||
eventProcessor.handleUnsubscribed.calledOnce.should.be.true();
|
await service.processEventBatch([{
|
||||||
eventProcessor.handleDelivered.called.should.be.false();
|
type: 'unsubscribed',
|
||||||
eventProcessor.handleOpened.called.should.be.false();
|
emailId: 1,
|
||||||
|
timestamp: new Date(1)
|
||||||
|
}], result, fetchData);
|
||||||
|
|
||||||
result.should.deepEqual(new EventProcessingResult({
|
result.should.deepEqual(new EventProcessingResult({
|
||||||
unsubscribed: 1,
|
unprocessable: 1
|
||||||
emailIds: [1],
|
}));
|
||||||
memberIds: [1]
|
|
||||||
}));
|
|
||||||
|
|
||||||
fetchData.should.deepEqual({
|
|
||||||
lastEventTimestamp: new Date(1)
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
it('handles complained', async function () {
|
|
||||||
const service = new EmailAnalyticsService({
|
|
||||||
eventProcessor
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const result = new EventProcessingResult();
|
it('complained returns unprocessable', async function () {
|
||||||
const fetchData = {};
|
const service = new EmailAnalyticsService({
|
||||||
|
eventProcessor
|
||||||
|
});
|
||||||
|
|
||||||
await service.processEventBatch([{
|
const result = new EventProcessingResult();
|
||||||
type: 'complained',
|
const fetchData = {};
|
||||||
emailId: 1,
|
|
||||||
timestamp: new Date(1)
|
|
||||||
}], result, fetchData);
|
|
||||||
|
|
||||||
eventProcessor.handleComplained.calledOnce.should.be.true();
|
await service.processEventBatch([{
|
||||||
eventProcessor.handleDelivered.called.should.be.false();
|
type: 'complained',
|
||||||
eventProcessor.handleOpened.called.should.be.false();
|
emailId: 1,
|
||||||
|
timestamp: new Date(1)
|
||||||
|
}], result, fetchData);
|
||||||
|
|
||||||
result.should.deepEqual(new EventProcessingResult({
|
result.should.deepEqual(new EventProcessingResult({
|
||||||
complained: 1,
|
unprocessable: 1
|
||||||
emailIds: [1],
|
}));
|
||||||
memberIds: [1]
|
|
||||||
}));
|
|
||||||
|
|
||||||
fetchData.should.deepEqual({
|
|
||||||
lastEventTimestamp: new Date(1)
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
it(`doens't handle other event types`, async function () {
|
|
||||||
const service = new EmailAnalyticsService({
|
|
||||||
eventProcessor
|
|
||||||
});
|
|
||||||
|
|
||||||
const result = new EventProcessingResult();
|
|
||||||
const fetchData = {};
|
|
||||||
|
|
||||||
await service.processEventBatch([{
|
|
||||||
type: 'notstandard',
|
|
||||||
emailId: 1,
|
|
||||||
timestamp: new Date(1)
|
|
||||||
}], result, fetchData);
|
|
||||||
|
|
||||||
eventProcessor.handleDelivered.called.should.be.false();
|
|
||||||
eventProcessor.handleOpened.called.should.be.false();
|
|
||||||
|
|
||||||
result.should.deepEqual(new EventProcessingResult({
|
|
||||||
unhandled: 1
|
|
||||||
}));
|
|
||||||
|
|
||||||
fetchData.should.deepEqual({
|
|
||||||
lastEventTimestamp: new Date(1)
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('processEvent', function () {
|
describe('processEvent', function () {
|
||||||
});
|
});
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user