diff --git a/ghost/core/core/server/services/email-analytics/lib/queries.js b/ghost/core/core/server/services/email-analytics/lib/queries.js index 35c7855411..93185f62aa 100644 --- a/ghost/core/core/server/services/email-analytics/lib/queries.js +++ b/ghost/core/core/server/services/email-analytics/lib/queries.js @@ -1,9 +1,14 @@ const _ = require('lodash'); const debug = require('@tryghost/debug')('services:email-analytics'); const db = require('../../../data/db'); +const logging = require('@tryghost/logging'); +const {default: ObjectID} = require('bson-objectid'); const MIN_EMAIL_COUNT_FOR_OPEN_RATE = 5; +/** @typedef {'email-analytics-latest-opened'|'email-analytics-latest-others'|'email-analytics-missing'|'email-analytics-scheduled'} EmailAnalyticsJobName */ +/** @typedef {'delivered'|'opened'|'failed'} EmailAnalyticsEvent */ + module.exports = { async shouldFetchStats() { // don't fetch stats from Mailgun if we haven't sent any emails @@ -13,31 +18,52 @@ module.exports = { /** * Retrieves the timestamp of the last seen event for the specified email analytics events. - * @param {string[]} events - The email analytics events to consider (default: ['delivered', 'opened', 'failed']). + * @param {EmailAnalyticsJobName} jobName - The name of the job to update. + * @param {EmailAnalyticsEvent[]} [events=['delivered', 'opened', 'failed']] - The email analytics events to consider. * @returns {Promise} The timestamp of the last seen event, or null if no events are found. */ - async getLastEventTimestamp(events = ['delivered', 'opened', 'failed']) { + async getLastEventTimestamp(jobName, events = ['delivered', 'opened', 'failed']) { const startDate = new Date(); + + let maxOpenedAt; + let maxDeliveredAt; + let maxFailedAt; - // separate queries is much faster than using max/greatest (with coalesce to handle nulls) across columns - let maxOpenedAt = events.includes('opened') ? (await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first()).maxOpenedAt : null; - let maxDeliveredAt = events.includes('delivered') ? (await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first()).maxDeliveredAt : null; - let maxFailedAt = events.includes('failed') ? (await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first()).maxFailedAt : null; + const jobData = await db.knex('jobs').select('finished_at', 'started_at').where('name', jobName).first(); - if (maxOpenedAt && !(maxOpenedAt instanceof Date)) { - // SQLite returns a string instead of a Date - maxOpenedAt = new Date(maxOpenedAt); + if (jobData) { + debug(`Using job data for ${jobName}`); + const lastJobTimestamp = jobData.finished_at || jobData.started_at; + maxOpenedAt = events.includes('opened') ? lastJobTimestamp : null; + maxDeliveredAt = events.includes('delivered') ? lastJobTimestamp : null; + maxFailedAt = events.includes('failed') ? lastJobTimestamp : null; + } else { + debug(`Job data not found for ${jobName}, using email_recipients data`); + logging.info(`Job data not found for ${jobName}, using email_recipients data`); + if (events.includes('opened')) { + maxOpenedAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first()).maxOpenedAt; + } + if (events.includes('delivered')) { + maxDeliveredAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first()).maxDeliveredAt; + } + if (events.includes('failed')) { + maxFailedAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first()).maxFailedAt; + } + + // Insert a new job row if it doesn't exist + await db.knex('jobs').insert({ + id: new ObjectID().toHexString(), + name: jobName, + started_at: new Date(), + created_at: new Date(), + status: 'started' + }).onConflict('name').ignore(); } - if (maxDeliveredAt && !(maxDeliveredAt instanceof Date)) { - // SQLite returns a string instead of a Date - maxDeliveredAt = new Date(maxDeliveredAt); - } - - if (maxFailedAt && !(maxFailedAt instanceof Date)) { - // SQLite returns a string instead of a Date - maxFailedAt = new Date(maxFailedAt); - } + // Convert string dates to Date objects for SQLite compatibility + [maxOpenedAt, maxDeliveredAt, maxFailedAt] = [maxOpenedAt, maxDeliveredAt, maxFailedAt].map(date => ( + date && !(date instanceof Date) ? new Date(date) : date + )); const lastSeenEventTimestamp = _.max([maxOpenedAt, maxDeliveredAt, maxFailedAt]); debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`); @@ -45,6 +71,71 @@ module.exports = { return lastSeenEventTimestamp; }, + /** + * Sets the timestamp of the last seen event for the specified email analytics events. + * @param {EmailAnalyticsJobName} jobName - The name of the job to update. + * @param {'completed'|'started'} field - The field to update. + * @param {Date} date - The timestamp of the last seen event. + * @returns {Promise} + * @description + * Updates the `finished_at` or `started_at` column of the specified job in the `jobs` table with the provided timestamp. + * This is used to keep track of the last time the job was run to avoid expensive queries following reboot. + */ + async setJobTimestamp(jobName, field, date) { + // Convert string dates to Date objects for SQLite compatibility + try { + debug(`Setting ${field} timestamp for job ${jobName} to ${date}`); + const updateField = field === 'completed' ? 'finished_at' : 'started_at'; + const status = field === 'completed' ? 'finished' : 'started'; + const result = await db.knex('jobs').update({[updateField]: date, updated_at: new Date(), status: status}).where('name', jobName); + if (result === 0) { + await db.knex('jobs').insert({ + id: new ObjectID().toHexString(), + name: jobName, + [updateField]: date, + updated_at: date, + status: status + }); + } + } catch (err) { + debug(`Error setting ${field} timestamp for job ${jobName}: ${err.message}`); + } + }, + + /** + * Sets the status of the specified email analytics job. + * @param {EmailAnalyticsJobName} jobName - The name of the job to update. + * @param {'started'|'finished'|'failed'} status - The new status of the job. + * @returns {Promise} + * @description + * Updates the `status` column of the specified job in the `jobs` table with the provided status. + * This is used to keep track of the current state of the job. + */ + async setJobStatus(jobName, status) { + debug(`Setting status for job ${jobName} to ${status}`); + try { + const result = await db.knex('jobs') + .update({ + status: status, + updated_at: new Date() + }) + .where('name', jobName); + + if (result === 0) { + await db.knex('jobs').insert({ + id: new ObjectID().toHexString(), + name: jobName, + status: status, + created_at: new Date(), + updated_at: new Date() + }); + } + } catch (err) { + debug(`Error setting status for job ${jobName}: ${err.message}`); + throw err; + } + }, + async aggregateEmailStats(emailId) { const {totalCount} = await db.knex('emails').select(db.knex.raw('email_count as totalCount')).where('id', emailId).first() || {totalCount: 0}; // use IS NULL here because that will typically match far fewer rows than IS NOT NULL making the query faster @@ -83,4 +174,4 @@ module.exports = { .update(updateQuery) .where('id', memberId); } -}; +}; \ No newline at end of file diff --git a/ghost/email-analytics-service/lib/EmailAnalyticsService.js b/ghost/email-analytics-service/lib/EmailAnalyticsService.js index c6fa792f21..589080b5cc 100644 --- a/ghost/email-analytics-service/lib/EmailAnalyticsService.js +++ b/ghost/email-analytics-service/lib/EmailAnalyticsService.js @@ -9,6 +9,7 @@ const errors = require('@tryghost/errors'); /** * @typedef {object} FetchData * @property {boolean} running + * @property {('email-analytics-latest-others'|'email-analytics-missing'|'email-analytics-latest-opened'|'email-analytics-scheduled')} jobName Name of the job that is running * @property {Date} [lastStarted] Date the last fetch started on * @property {Date} [lastBegin] The begin time used during the last fetch * @property {Date} [lastEventTimestamp] @@ -16,7 +17,11 @@ const errors = require('@tryghost/errors'); */ /** - * @typedef {FetchData & {schedule: {begin: Date, end: Date}}} FetchDataScheduled + * @typedef {FetchData & {schedule?: {begin: Date, end: Date}}} FetchDataScheduled + */ + +/** + * @typedef {'delivered' | 'opened' | 'failed' | 'unsubscribed' | 'complained'} EmailAnalyticsEvent */ const TRUST_THRESHOLD_MS = 30 * 60 * 1000; // 30 minutes @@ -32,26 +37,42 @@ module.exports = class EmailAnalyticsService { /** * @type {FetchData} */ - #fetchLatestNonOpenedData = null; + #fetchLatestNonOpenedData = { + running: false, + jobName: 'email-analytics-latest-others' + }; /** * @type {FetchData} */ - #fetchMissingData = null; + #fetchMissingData = { + running: false, + jobName: 'email-analytics-missing' + }; /** * @type {FetchData} */ - #fetchLatestOpenedData = null; + #fetchLatestOpenedData = { + running: false, + jobName: 'email-analytics-latest-opened' + }; /** * @type {FetchDataScheduled} */ - #fetchScheduledData = null; + #fetchScheduledData = { + running: false, + jobName: 'email-analytics-scheduled' + }; /** * @param {object} dependencies + * @param {object} dependencies.config + * @param {object} dependencies.settings + * @param {object} dependencies.queries * @param {EmailEventProcessor} dependencies.eventProcessor + * @param {object} dependencies.providers */ constructor({config, settings, queries, eventProcessor, providers}) { this.config = config; @@ -71,16 +92,25 @@ module.exports = class EmailAnalyticsService { } /** - * Returns the timestamp of the last event we processed. Defaults to now minus 30 minutes if we have no data yet. + * Returns the timestamp of the last non-opened event we processed. Defaults to now minus 30 minutes if we have no data yet. */ async getLastNonOpenedEventTimestamp() { - return this.#fetchLatestNonOpenedData?.lastEventTimestamp ?? (await this.queries.getLastEventTimestamp(['delivered','failed'])) ?? new Date(Date.now() - TRUST_THRESHOLD_MS); + return this.#fetchLatestNonOpenedData?.lastEventTimestamp ?? (await this.queries.getLastEventTimestamp(this.#fetchLatestNonOpenedData.jobName,['delivered','failed'])) ?? new Date(Date.now() - TRUST_THRESHOLD_MS); } + /** + * Returns the timestamp of the last opened event we processed. Defaults to now minus 30 minutes if we have no data yet. + */ async getLastOpenedEventTimestamp() { - return this.#fetchLatestOpenedData?.lastEventTimestamp ?? (await this.queries.getLastEventTimestamp(['opened'])) ?? new Date(Date.now() - TRUST_THRESHOLD_MS); + return this.#fetchLatestOpenedData?.lastEventTimestamp ?? (await this.queries.getLastEventTimestamp(this.#fetchLatestOpenedData.jobName,['opened'])) ?? new Date(Date.now() - TRUST_THRESHOLD_MS); } + /** + * Fetches the latest opened events. + * @param {Object} options - The options for fetching events. + * @param {number} [options.maxEvents=Infinity] - The maximum number of events to fetch. + * @returns {Promise} The total number of events fetched. + */ async fetchLatestOpenedEvents({maxEvents = Infinity} = {}) { // Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available const begin = await this.getLastOpenedEventTimestamp(); @@ -92,16 +122,15 @@ module.exports = class EmailAnalyticsService { return 0; } - // Create the fetch data object if it doesn't exist yet - if (!this.#fetchLatestOpenedData) { - this.#fetchLatestOpenedData = { - running: false - }; - } - return await this.#fetchEvents(this.#fetchLatestOpenedData, {begin, end, maxEvents, eventTypes: ['opened']}); } + /** + * Fetches the latest non-opened events. + * @param {Object} options - The options for fetching events. + * @param {number} [options.maxEvents=Infinity] - The maximum number of events to fetch. + * @returns {Promise} The total number of events fetched. + */ async fetchLatestNonOpenedEvents({maxEvents = Infinity} = {}) { // Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available const begin = await this.getLastNonOpenedEventTimestamp(); @@ -113,13 +142,6 @@ module.exports = class EmailAnalyticsService { return 0; } - // Create the fetch data object if it doesn't exist yet - if (!this.#fetchLatestNonOpenedData) { - this.#fetchLatestNonOpenedData = { - running: false - }; - } - return await this.#fetchEvents(this.#fetchLatestNonOpenedData, {begin, end, maxEvents, eventTypes: ['delivered', 'failed', 'unsubscribed', 'complained']}); } @@ -136,7 +158,7 @@ module.exports = class EmailAnalyticsService { const end = new Date( Math.min( Date.now() - TRUST_THRESHOLD_MS, - this.#fetchLatestNonOpenedData?.lastBegin?.getTime() + this.#fetchLatestNonOpenedData?.lastBegin?.getTime() || Date.now() // Fallback to now if the previous job didn't run, for whatever reason, prevents catastrophic error ) ); @@ -146,18 +168,15 @@ module.exports = class EmailAnalyticsService { return 0; } - // Create the fetch data object if it doesn't exist yet - if (!this.#fetchMissingData) { - this.#fetchMissingData = { - running: false - }; - } - return await this.#fetchEvents(this.#fetchMissingData, {begin, end, maxEvents}); } /** - * Schedule a new fetch that should happen + * Schedule a new fetch for email analytics events. + * @param {Object} options - The options for scheduling the fetch. + * @param {Date} options.begin - The start date for the scheduled fetch. + * @param {Date} options.end - The end date for the scheduled fetch. + * @throws {errors.ValidationError} Throws an error if a fetch is already in progress. */ schedule({begin, end}) { if (this.#fetchScheduledData && this.#fetchScheduledData.running) { @@ -168,6 +187,7 @@ module.exports = class EmailAnalyticsService { logging.info('[EmailAnalytics] Scheduling fetch from ' + begin.toISOString() + ' until ' + end.toISOString()); this.#fetchScheduledData = { running: false, + jobName: 'email-analytics-scheduled', schedule: { begin, end @@ -175,19 +195,32 @@ module.exports = class EmailAnalyticsService { }; } + /** + * Cancels the scheduled fetch of email analytics events. + * If a fetch is currently running, it marks it for cancellation. + * If no fetch is running, it clears the scheduled fetch data. + * @method cancelScheduled + */ cancelScheduled() { if (this.#fetchScheduledData) { if (this.#fetchScheduledData.running) { // Cancel the running fetch this.#fetchScheduledData.canceled = true; } else { - this.#fetchScheduledData = null; + this.#fetchScheduledData = { + running: false, + jobName: 'email-analytics-scheduled' + }; } } } /** * Continues fetching the scheduled events (does not start one). Resets the scheduled event when received 0 events. + * @method fetchScheduled + * @param {Object} [options] - The options for fetching scheduled events. + * @param {number} [options.maxEvents=Infinity] - The maximum number of events to fetch. + * @returns {Promise} The number of events fetched. */ async fetchScheduled({maxEvents = Infinity} = {}) { if (!this.#fetchScheduledData || !this.#fetchScheduledData.schedule) { @@ -212,26 +245,34 @@ module.exports = class EmailAnalyticsService { if (end <= begin) { // Skip for now logging.info('[EmailAnalytics] Ending fetchScheduled because end is before begin'); - this.#fetchScheduledData = null; + this.#fetchScheduledData = { + running: false, + jobName: 'email-analytics-scheduled' + }; return 0; } const count = await this.#fetchEvents(this.#fetchScheduledData, {begin, end, maxEvents}); if (count === 0 || this.#fetchScheduledData.canceled) { // Reset the scheduled fetch - this.#fetchScheduledData = null; + this.#fetchScheduledData = { + running: false, + jobName: 'email-analytics-scheduled' + }; } + + this.queries.setJobTimestamp(this.#fetchScheduledData.jobName, 'completed', this.#fetchScheduledData.lastEventTimestamp); return count; } - /** * Start fetching analytics and store the data of the progress inside fetchData - * @param {FetchData} fetchData - * @param {object} options - * @param {Date} options.begin - * @param {Date} options.end - * @param {number} [options.maxEvents] Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks. - * @param {String[]} [options.eventTypes] Only fetch these events, ['delivered', 'opened', 'failed', 'unsubscribed', 'complained'] + * @param {FetchData} fetchData - Object to store the progress of the fetch operation + * @param {object} options - Options for fetching events + * @param {Date} options.begin - Start date for fetching events + * @param {Date} options.end - End date for fetching events + * @param {number} [options.maxEvents=Infinity] - Maximum number of events to fetch. Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks. + * @param {EmailAnalyticsEvent[]} [options.eventTypes] - Array of event types to fetch. If not provided, Mailgun will return all event types. + * @returns {Promise} The number of events fetched */ async #fetchEvents(fetchData, {begin, end, maxEvents = Infinity, eventTypes = null}) { // Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available @@ -241,6 +282,7 @@ module.exports = class EmailAnalyticsService { fetchData.running = true; fetchData.lastStarted = new Date(); fetchData.lastBegin = begin; + this.queries.setJobTimestamp(fetchData.jobName, 'started', begin); let lastAggregation = Date.now(); let eventCount = 0; @@ -249,6 +291,13 @@ module.exports = class EmailAnalyticsService { let processingResult = new EventProcessingResult(); let error = null; + /** + * Process a batch of events + * @param {Array} events - Array of event objects to process + * @param {EventProcessingResult} processingResult - Object to store the processing results + * @param {FetchData} fetchData - Object containing fetch operation data + * @returns {Promise} + */ const processBatch = async (events) => { // Even if the fetching is interrupted because of an error, we still store the last event timestamp await this.processEventBatch(events, processingResult, fetchData); @@ -309,7 +358,14 @@ module.exports = class EmailAnalyticsService { // So if we didn't have errors while fetching, and total events < maxEvents, increase lastEventTimestamp with one second if (!error && eventCount > 0 && eventCount < maxEvents && fetchData.lastEventTimestamp && fetchData.lastEventTimestamp.getTime() < Date.now() - 2000) { logging.info('[EmailAnalytics] Reached end of new events, increasing lastEventTimestamp with one second'); + // set the data on the db so we can store it for fetching after reboot + await this.queries.setJobTimestamp(fetchData.jobName, 'completed', new Date(fetchData.lastEventTimestamp.getTime())); + // increment and store in local memory fetchData.lastEventTimestamp = new Date(fetchData.lastEventTimestamp.getTime() + 1000); + } else { + logging.info('[EmailAnalytics] No new events found'); + // set job status to finished + await this.queries.setJobStatus(fetchData.jobName, 'completed'); } fetchData.running = false; @@ -321,8 +377,11 @@ module.exports = class EmailAnalyticsService { } /** - * @param {any[]} events - * @param {FetchData} fetchData + * Process a batch of email analytics events. + * @param {any[]} events - An array of email analytics events to process. + * @param {Object} result - The result object to merge batch processing results into. + * @param {FetchData} fetchData - Data related to the current fetch operation. + * @returns {Promise} */ async processEventBatch(events, result, fetchData) { const processStart = Date.now(); @@ -331,7 +390,7 @@ module.exports = class EmailAnalyticsService { // Save last event timestamp if (!fetchData.lastEventTimestamp || (event.timestamp && event.timestamp > fetchData.lastEventTimestamp)) { - fetchData.lastEventTimestamp = event.timestamp; + fetchData.lastEventTimestamp = event.timestamp; // don't need to keep db in sync; it'll fall back to last completed timestamp anyways } result.merge(batchResult); @@ -437,8 +496,10 @@ module.exports = class EmailAnalyticsService { return new EventProcessingResult({unhandled: 1}); } + /** + * @param {{emailIds?: string[], memberIds?: string[]}} stats + */ async aggregateStats({emailIds = [], memberIds = []}) { - logging.info(`[EmailAnalytics] Aggregating for ${emailIds.length} emails`); for (const emailId of emailIds) { await this.aggregateEmailStats(emailId); } @@ -449,10 +510,20 @@ module.exports = class EmailAnalyticsService { } } + /** + * Aggregate email stats for a given email ID. + * @param {string} emailId - The ID of the email to aggregate stats for. + * @returns {Promise} + */ async aggregateEmailStats(emailId) { return this.queries.aggregateEmailStats(emailId); } + /** + * Aggregate member stats for a given member ID. + * @param {string} memberId - The ID of the member to aggregate stats for. + * @returns {Promise} + */ async aggregateMemberStats(memberId) { return this.queries.aggregateMemberStats(memberId); } diff --git a/ghost/email-analytics-service/test/email-analytics-service.test.js b/ghost/email-analytics-service/test/email-analytics-service.test.js index b9cd540f03..229e9712ad 100644 --- a/ghost/email-analytics-service/test/email-analytics-service.test.js +++ b/ghost/email-analytics-service/test/email-analytics-service.test.js @@ -16,10 +16,22 @@ describe('EmailAnalyticsService', function () { const service = new EmailAnalyticsService({}); const result = service.getStatus(); result.should.deepEqual({ - latest: null, - missing: null, - scheduled: null, - latestOpened: null + latest: { + jobName: 'email-analytics-latest-others', + running: false + }, + latestOpened: { + jobName: 'email-analytics-latest-opened', + running: false + }, + missing: { + jobName: 'email-analytics-missing', + running: false + }, + scheduled: { + jobName: 'email-analytics-scheduled', + running: false + } }); }); }); @@ -84,7 +96,9 @@ describe('EmailAnalyticsService', function () { const fetchLatestSpy = sinon.spy(); const service = new EmailAnalyticsService({ queries: { - getLastEventTimestamp: sinon.stub().resolves() + getLastEventTimestamp: sinon.stub().resolves(), + setJobTimestamp: sinon.stub().resolves(), + setJobStatus: sinon.stub().resolves() }, providers: [{ fetchLatest: fetchLatestSpy @@ -94,13 +108,32 @@ describe('EmailAnalyticsService', function () { fetchLatestSpy.calledOnce.should.be.true(); fetchLatestSpy.getCall(0).args[1].should.have.property('events', ['opened']); }); + + it('quits if the end is before the begin', async function () { + const fetchLatestSpy = sinon.spy(); + const service = new EmailAnalyticsService({ + queries: { + getLastEventTimestamp: sinon.stub().resolves(new Date(Date.now() + 24 * 60 * 60 * 1000)), // 24 hours in the future + setJobTimestamp: sinon.stub().resolves(), + setJobStatus: sinon.stub().resolves() + }, + providers: [{ + fetchLatest: fetchLatestSpy + }] + }); + await service.fetchLatestOpenedEvents(); + fetchLatestSpy.calledOnce.should.be.false(); + }); }); + describe('fetchLatestNonOpenedEvents', function () { it('fetches only non-opened events', async function () { const fetchLatestSpy = sinon.spy(); const service = new EmailAnalyticsService({ queries: { - getLastEventTimestamp: sinon.stub().resolves() + getLastEventTimestamp: sinon.stub().resolves(), + setJobTimestamp: sinon.stub().resolves(), + setJobStatus: sinon.stub().resolves() }, providers: [{ fetchLatest: fetchLatestSpy @@ -110,293 +143,545 @@ describe('EmailAnalyticsService', function () { fetchLatestSpy.calledOnce.should.be.true(); fetchLatestSpy.getCall(0).args[1].should.have.property('events', ['delivered', 'failed', 'unsubscribed', 'complained']); }); + + it('quits if the end is before the begin', async function () { + const fetchLatestSpy = sinon.spy(); + const service = new EmailAnalyticsService({ + queries: { + getLastEventTimestamp: sinon.stub().resolves(new Date(Date.now() + 24 * 60 * 60 * 1000)), // 24 hours in the future + setJobTimestamp: sinon.stub().resolves(), + setJobStatus: sinon.stub().resolves() + }, + providers: [{ + fetchLatest: fetchLatestSpy + }] + }); + await service.fetchLatestNonOpenedEvents(); + fetchLatestSpy.calledOnce.should.be.false(); + }); + }); + describe('fetchScheduled', function () { + let service; + let processEventBatchStub; + let aggregateStatsStub; + + beforeEach(function () { + service = new EmailAnalyticsService({ + queries: { + setJobTimestamp: sinon.stub().resolves(), + setJobStatus: sinon.stub().resolves() + }, + providers: [{ + fetchLatest: (fn) => { + const events = [1,2,3,4,5,6,7,8,9,10]; + fn(events); + } + }] + }); + processEventBatchStub = sinon.stub(service, 'processEventBatch').resolves(); + aggregateStatsStub = sinon.stub(service, 'aggregateStats').resolves(); + }); + + afterEach(function () { + sinon.restore(); + }); + + it('returns 0 when nothing is scheduled', async function () { + const result = await service.fetchScheduled(); + result.should.equal(0); + processEventBatchStub.called.should.be.false(); + aggregateStatsStub.called.should.be.false(); + }); + + it('returns 0 when fetch is canceled', async function () { + service.schedule({ + begin: new Date(2023, 0, 1), + end: new Date(2023, 0, 2) + }); + service.cancelScheduled(); + const result = await service.fetchScheduled(); + result.should.equal(0); + processEventBatchStub.called.should.be.false(); + aggregateStatsStub.called.should.be.false(); + }); + + it('fetches events with correct parameters', async function () { + service.schedule({ + begin: new Date(2023, 0, 1), + end: new Date(2023, 0, 2) + }); + + const result = await service.fetchScheduled({maxEvents: 100}); + + result.should.equal(10); + aggregateStatsStub.calledOnce.should.be.true(); + processEventBatchStub.calledOnce.should.be.true(); + }); + + it('bails when end date is before begin date', async function () { + service.schedule({ + begin: new Date(2023, 0, 2), + end: new Date(2023, 0, 1) + }); + const result = await service.fetchScheduled({maxEvents: 100}); + result.should.equal(0); + }); + + it('resets fetchScheduledData when no events are fetched', async function () { + service = new EmailAnalyticsService({ + queries: { + setJobTimestamp: sinon.stub().resolves(), + setJobStatus: sinon.stub().resolves() + }, + providers: [{ + fetchLatest: (fn) => { + fn([]); + } + }] + }); + + service.schedule({ + begin: new Date(2023, 0, 1), + end: new Date(2023, 0, 2) + }); + const result = await service.fetchScheduled({maxEvents: 100}); + result.should.equal(0); + }); + }); + + describe('fetchMissing', function () { + it('fetches missing events', async function () { + const fetchLatestSpy = sinon.spy(); + const service = new EmailAnalyticsService({ + queries: { + setJobTimestamp: sinon.stub().resolves(), + setJobStatus: sinon.stub().resolves() + }, + providers: [{ + fetchLatest: fetchLatestSpy + }] + }); + await service.fetchMissing(); + fetchLatestSpy.calledOnce.should.be.true(); + }); }); }); describe('processEventBatch', function () { - let eventProcessor; - beforeEach(function () { - eventProcessor = {}; - eventProcessor.handleDelivered = sinon.stub().callsFake(({emailId}) => { - return { - emailId, - emailRecipientId: emailId, - memberId: 1 - }; + describe('with functional processor', function () { + let eventProcessor; + beforeEach(function () { + eventProcessor = {}; + eventProcessor.handleDelivered = sinon.stub().callsFake(({emailId}) => { + return { + emailId, + emailRecipientId: emailId, + memberId: 1 + }; + }); + eventProcessor.handleOpened = sinon.stub().callsFake(({emailId}) => { + return { + emailId, + emailRecipientId: emailId, + memberId: 1 + }; + }); + eventProcessor.handlePermanentFailed = sinon.stub().callsFake(({emailId}) => { + return { + emailId, + emailRecipientId: emailId, + memberId: 1 + }; + }); + eventProcessor.handleTemporaryFailed = sinon.stub().callsFake(({emailId}) => { + return { + emailId, + emailRecipientId: emailId, + memberId: 1 + }; + }); + eventProcessor.handleUnsubscribed = sinon.stub().callsFake(({emailId}) => { + return { + emailId, + emailRecipientId: emailId, + memberId: 1 + }; + }); + eventProcessor.handleComplained = sinon.stub().callsFake(({emailId}) => { + return { + emailId, + emailRecipientId: emailId, + memberId: 1 + }; + }); }); - eventProcessor.handleOpened = sinon.stub().callsFake(({emailId}) => { - return { - emailId, - emailRecipientId: emailId, - memberId: 1 - }; - }); - eventProcessor.handlePermanentFailed = sinon.stub().callsFake(({emailId}) => { - return { - emailId, - emailRecipientId: emailId, - memberId: 1 - }; - }); - eventProcessor.handleTemporaryFailed = sinon.stub().callsFake(({emailId}) => { - return { - emailId, - emailRecipientId: emailId, - memberId: 1 - }; - }); - eventProcessor.handleUnsubscribed = sinon.stub().callsFake(({emailId}) => { - return { - emailId, - emailRecipientId: emailId, - memberId: 1 - }; - }); - eventProcessor.handleComplained = sinon.stub().callsFake(({emailId}) => { - return { - emailId, - emailRecipientId: emailId, - memberId: 1 - }; - }); - }); - - it('uses passed-in event processor', async function () { - const service = new EmailAnalyticsService({ - eventProcessor + + it('uses passed-in event processor', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); + + const result = new EventProcessingResult(); + const fetchData = {}; + await service.processEventBatch([{ + type: 'delivered', + emailId: 1, + timestamp: new Date(1) + }, { + type: 'delivered', + emailId: 2, + timestamp: new Date(2) + }, { + type: 'opened', + emailId: 1, + timestamp: new Date(3) + }], result, fetchData); + + eventProcessor.handleDelivered.callCount.should.eql(2); + eventProcessor.handleOpened.callCount.should.eql(1); + + result.should.deepEqual(new EventProcessingResult({ + delivered: 2, + opened: 1, + unprocessable: 0, + emailIds: [1, 2], + memberIds: [1] + })); + + fetchData.should.deepEqual({ + lastEventTimestamp: new Date(3) + }); }); - const result = new EventProcessingResult(); - const fetchData = {}; - await service.processEventBatch([{ - type: 'delivered', - emailId: 1, - timestamp: new Date(1) - }, { - type: 'delivered', - emailId: 2, - timestamp: new Date(2) - }, { - type: 'opened', - emailId: 1, - timestamp: new Date(3) - }], result, fetchData); + it('handles opened', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); - eventProcessor.handleDelivered.callCount.should.eql(2); - eventProcessor.handleOpened.callCount.should.eql(1); + const result = new EventProcessingResult(); + const fetchData = {}; - result.should.deepEqual(new EventProcessingResult({ - delivered: 2, - opened: 1, - unprocessable: 0, - emailIds: [1, 2], - memberIds: [1] - })); + await service.processEventBatch([{ + type: 'opened', + emailId: 1, + timestamp: new Date(1) + }], result, fetchData); - fetchData.should.deepEqual({ - lastEventTimestamp: new Date(3) + eventProcessor.handleOpened.calledOnce.should.be.true(); + + result.should.deepEqual(new EventProcessingResult({ + delivered: 0, + opened: 1, + unprocessable: 0, + emailIds: [1], + memberIds: [1] + })); + + fetchData.should.deepEqual({ + lastEventTimestamp: new Date(1) + }); + }); + + it('handles delivered', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); + + const result = new EventProcessingResult(); + const fetchData = {}; + + await service.processEventBatch([{ + type: 'delivered', + emailId: 1, + timestamp: new Date(1) + }], result, fetchData); + + eventProcessor.handleDelivered.calledOnce.should.be.true(); + + result.should.deepEqual(new EventProcessingResult({ + delivered: 1, + opened: 0, + unprocessable: 0, + emailIds: [1], + memberIds: [1] + })); + + fetchData.should.deepEqual({ + lastEventTimestamp: new Date(1) + }); + }); + + it('handles failed (permanent)', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); + + const result = new EventProcessingResult(); + const fetchData = {}; + + await service.processEventBatch([{ + type: 'failed', + severity: 'permanent', + emailId: 1, + timestamp: new Date(1) + }], result, fetchData); + + eventProcessor.handlePermanentFailed.calledOnce.should.be.true(); + + result.should.deepEqual(new EventProcessingResult({ + permanentFailed: 1, + emailIds: [1], + memberIds: [1] + })); + + fetchData.should.deepEqual({ + lastEventTimestamp: new Date(1) + }); + }); + + it('handles failed (temporary)', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); + + const result = new EventProcessingResult(); + const fetchData = {}; + + await service.processEventBatch([{ + type: 'failed', + severity: 'temporary', + emailId: 1, + timestamp: new Date(1) + }], result, fetchData); + + eventProcessor.handleTemporaryFailed.calledOnce.should.be.true(); + + result.should.deepEqual(new EventProcessingResult({ + temporaryFailed: 1, + emailIds: [1], + memberIds: [1] + })); + + fetchData.should.deepEqual({ + lastEventTimestamp: new Date(1) + }); + }); + + it('handles unsubscribed', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); + + const result = new EventProcessingResult(); + const fetchData = {}; + + await service.processEventBatch([{ + type: 'unsubscribed', + emailId: 1, + timestamp: new Date(1) + }], result, fetchData); + + eventProcessor.handleUnsubscribed.calledOnce.should.be.true(); + eventProcessor.handleDelivered.called.should.be.false(); + eventProcessor.handleOpened.called.should.be.false(); + + result.should.deepEqual(new EventProcessingResult({ + unsubscribed: 1, + emailIds: [1], + memberIds: [1] + })); + + fetchData.should.deepEqual({ + lastEventTimestamp: new Date(1) + }); + }); + + it('handles complained', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); + + const result = new EventProcessingResult(); + const fetchData = {}; + + await service.processEventBatch([{ + type: 'complained', + emailId: 1, + timestamp: new Date(1) + }], result, fetchData); + + eventProcessor.handleComplained.calledOnce.should.be.true(); + eventProcessor.handleDelivered.called.should.be.false(); + eventProcessor.handleOpened.called.should.be.false(); + + result.should.deepEqual(new EventProcessingResult({ + complained: 1, + emailIds: [1], + memberIds: [1] + })); + + fetchData.should.deepEqual({ + lastEventTimestamp: new Date(1) + }); + }); + + it(`doens't handle other event types`, async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); + + const result = new EventProcessingResult(); + const fetchData = {}; + + await service.processEventBatch([{ + type: 'notstandard', + emailId: 1, + timestamp: new Date(1) + }], result, fetchData); + + eventProcessor.handleDelivered.called.should.be.false(); + eventProcessor.handleOpened.called.should.be.false(); + + result.should.deepEqual(new EventProcessingResult({ + unhandled: 1 + })); + + fetchData.should.deepEqual({ + lastEventTimestamp: new Date(1) + }); }); }); - it('handles opened', async function () { - const service = new EmailAnalyticsService({ - eventProcessor + describe('with null processor results', function () { + let eventProcessor; + beforeEach(function () { + eventProcessor = {}; + eventProcessor.handleDelivered = sinon.stub().returns(null); + eventProcessor.handleOpened = sinon.stub().returns(null); + eventProcessor.handlePermanentFailed = sinon.stub().returns(null); + eventProcessor.handleTemporaryFailed = sinon.stub().returns(null); + eventProcessor.handleUnsubscribed = sinon.stub().returns(null); + eventProcessor.handleComplained = sinon.stub().returns(null); }); - const result = new EventProcessingResult(); - const fetchData = {}; + it('delivered returns unprocessable', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); - await service.processEventBatch([{ - type: 'opened', - emailId: 1, - timestamp: new Date(1) - }], result, fetchData); + const result = new EventProcessingResult(); + const fetchData = {}; - eventProcessor.handleOpened.calledOnce.should.be.true(); + await service.processEventBatch([{ + type: 'delivered', + emailId: 1, + timestamp: new Date(1) + }], result, fetchData); - result.should.deepEqual(new EventProcessingResult({ - delivered: 0, - opened: 1, - unprocessable: 0, - emailIds: [1], - memberIds: [1] - })); - - fetchData.should.deepEqual({ - lastEventTimestamp: new Date(1) - }); - }); - - it('handles delivered', async function () { - const service = new EmailAnalyticsService({ - eventProcessor + result.should.deepEqual(new EventProcessingResult({ + unprocessable: 1 + })); }); - const result = new EventProcessingResult(); - const fetchData = {}; + it('opened returns unprocessable', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); - await service.processEventBatch([{ - type: 'delivered', - emailId: 1, - timestamp: new Date(1) - }], result, fetchData); + const result = new EventProcessingResult(); + const fetchData = {}; - eventProcessor.handleDelivered.calledOnce.should.be.true(); + await service.processEventBatch([{ + type: 'opened', + emailId: 1, + timestamp: new Date(1) + }], result, fetchData); - result.should.deepEqual(new EventProcessingResult({ - delivered: 1, - opened: 0, - unprocessable: 0, - emailIds: [1], - memberIds: [1] - })); - - fetchData.should.deepEqual({ - lastEventTimestamp: new Date(1) - }); - }); - - it('handles failed (permanent)', async function () { - const service = new EmailAnalyticsService({ - eventProcessor + result.should.deepEqual(new EventProcessingResult({ + unprocessable: 1 + })); }); - const result = new EventProcessingResult(); - const fetchData = {}; + it('failed (permanent) returns unprocessable', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); - await service.processEventBatch([{ - type: 'failed', - severity: 'permanent', - emailId: 1, - timestamp: new Date(1) - }], result, fetchData); + const result = new EventProcessingResult(); + const fetchData = {}; - eventProcessor.handlePermanentFailed.calledOnce.should.be.true(); + await service.processEventBatch([{ + type: 'failed', + emailId: 1, + timestamp: new Date(1), + severity: 'permanent' + }], result, fetchData); - result.should.deepEqual(new EventProcessingResult({ - permanentFailed: 1, - emailIds: [1], - memberIds: [1] - })); - - fetchData.should.deepEqual({ - lastEventTimestamp: new Date(1) - }); - }); - - it('handles failed (temporary)', async function () { - const service = new EmailAnalyticsService({ - eventProcessor + result.should.deepEqual(new EventProcessingResult({ + unprocessable: 1 + })); }); - const result = new EventProcessingResult(); - const fetchData = {}; + it('failed (temporary) returns unprocessable', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); - await service.processEventBatch([{ - type: 'failed', - severity: 'temporary', - emailId: 1, - timestamp: new Date(1) - }], result, fetchData); + const result = new EventProcessingResult(); + const fetchData = {}; - eventProcessor.handleTemporaryFailed.calledOnce.should.be.true(); + await service.processEventBatch([{ + type: 'failed', + emailId: 1, + timestamp: new Date(1), + severity: 'temporary' + }], result, fetchData); - result.should.deepEqual(new EventProcessingResult({ - temporaryFailed: 1, - emailIds: [1], - memberIds: [1] - })); - - fetchData.should.deepEqual({ - lastEventTimestamp: new Date(1) - }); - }); - - it('handles unsubscribed', async function () { - const service = new EmailAnalyticsService({ - eventProcessor + result.should.deepEqual(new EventProcessingResult({ + unprocessable: 1 + })); }); - const result = new EventProcessingResult(); - const fetchData = {}; + it('unsubscribed returns unprocessable', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); - await service.processEventBatch([{ - type: 'unsubscribed', - emailId: 1, - timestamp: new Date(1) - }], result, fetchData); + const result = new EventProcessingResult(); + const fetchData = {}; - eventProcessor.handleUnsubscribed.calledOnce.should.be.true(); - eventProcessor.handleDelivered.called.should.be.false(); - eventProcessor.handleOpened.called.should.be.false(); + await service.processEventBatch([{ + type: 'unsubscribed', + emailId: 1, + timestamp: new Date(1) + }], result, fetchData); - result.should.deepEqual(new EventProcessingResult({ - unsubscribed: 1, - emailIds: [1], - memberIds: [1] - })); - - fetchData.should.deepEqual({ - lastEventTimestamp: new Date(1) - }); - }); - - it('handles complained', async function () { - const service = new EmailAnalyticsService({ - eventProcessor + result.should.deepEqual(new EventProcessingResult({ + unprocessable: 1 + })); }); - const result = new EventProcessingResult(); - const fetchData = {}; + it('complained returns unprocessable', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); - await service.processEventBatch([{ - type: 'complained', - emailId: 1, - timestamp: new Date(1) - }], result, fetchData); + const result = new EventProcessingResult(); + const fetchData = {}; - eventProcessor.handleComplained.calledOnce.should.be.true(); - eventProcessor.handleDelivered.called.should.be.false(); - eventProcessor.handleOpened.called.should.be.false(); + await service.processEventBatch([{ + type: 'complained', + emailId: 1, + timestamp: new Date(1) + }], result, fetchData); - result.should.deepEqual(new EventProcessingResult({ - complained: 1, - emailIds: [1], - memberIds: [1] - })); - - fetchData.should.deepEqual({ - lastEventTimestamp: new Date(1) - }); - }); - - it(`doens't handle other event types`, async function () { - const service = new EmailAnalyticsService({ - eventProcessor - }); - - const result = new EventProcessingResult(); - const fetchData = {}; - - await service.processEventBatch([{ - type: 'notstandard', - emailId: 1, - timestamp: new Date(1) - }], result, fetchData); - - eventProcessor.handleDelivered.called.should.be.false(); - eventProcessor.handleOpened.called.should.be.false(); - - result.should.deepEqual(new EventProcessingResult({ - unhandled: 1 - })); - - fetchData.should.deepEqual({ - lastEventTimestamp: new Date(1) + result.should.deepEqual(new EventProcessingResult({ + unprocessable: 1 + })); }); }); }); - + describe('processEvent', function () { });