diff --git a/ghost/core/core/server/services/email-analytics/EmailAnalyticsServiceWrapper.js b/ghost/core/core/server/services/email-analytics/EmailAnalyticsServiceWrapper.js index c8892f5809..823281f68c 100644 --- a/ghost/core/core/server/services/email-analytics/EmailAnalyticsServiceWrapper.js +++ b/ghost/core/core/server/services/email-analytics/EmailAnalyticsServiceWrapper.js @@ -57,11 +57,22 @@ class EmailAnalyticsServiceWrapper { }); } - async fetchLatest({maxEvents} = {maxEvents: Infinity}) { - logging.info('[EmailAnalytics] Fetch latest started'); + async fetchLatestOpenedEvents({maxEvents} = {maxEvents: Infinity}) { + logging.info('[EmailAnalytics] Fetch latest opened events started'); const fetchStartDate = new Date(); - const totalEvents = await this.service.fetchLatest({maxEvents}); + const totalEvents = await this.service.fetchLatestOpenedEvents({maxEvents}); + const fetchEndDate = new Date(); + + logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (latest opens)`); + return totalEvents; + } + + async fetchLatestNonOpenedEvents({maxEvents} = {maxEvents: Infinity}) { + logging.info('[EmailAnalytics] Fetch latest non-opened events started'); + + const fetchStartDate = new Date(); + const totalEvents = await this.service.fetchLatestNonOpenedEvents({maxEvents}); const fetchEndDate = new Date(); logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (latest)`); @@ -69,7 +80,7 @@ class EmailAnalyticsServiceWrapper { } async fetchMissing({maxEvents} = {maxEvents: Infinity}) { - logging.info('[EmailAnalytics] Fetch missing started'); + logging.info('[EmailAnalytics] Fetch missing events started'); const fetchStartDate = new Date(); const totalEvents = await this.service.fetchMissing({maxEvents}); @@ -83,7 +94,7 @@ class EmailAnalyticsServiceWrapper { if (maxEvents < 300) { return 0; } - logging.info('[EmailAnalytics] Fetch scheduled started'); + logging.info('[EmailAnalytics] Fetch scheduled events started'); const fetchStartDate = new Date(); const totalEvents = await this.service.fetchScheduled({maxEvents}); @@ -100,13 +111,31 @@ class EmailAnalyticsServiceWrapper { } this.fetching = true; + // NOTE: Data shows we can process ~7500 events per minute on Pro; this can vary locally try { - const c1 = await this.fetchLatest({maxEvents: Infinity}); - const c2 = await this.fetchMissing({maxEvents: Infinity}); + // Prioritize opens since they are the most important (only data directly displayed to users) + await this.fetchLatestOpenedEvents({maxEvents: Infinity}); + + // Set limits on how much we fetch without checkings for opened events. During surge events (following newsletter send) + // we want to make sure we don't spend too much time collecting delivery data. + const c1 = await this.fetchLatestNonOpenedEvents({maxEvents: 20000}); + if (c1 > 15000) { + this.fetching = false; + logging.info('[EmailAnalytics] Restarting fetch due to high event count'); + this.startFetch(); + return; + } + const c2 = await this.fetchMissing({maxEvents: 20000}); + if ((c1 + c2) > 15000) { + this.fetching = false; + logging.info('[EmailAnalytics] Restarting fetch due to high event count'); + this.startFetch(); + return; + } // Only fetch scheduled if we didn't fetch a lot of normal events await this.fetchScheduled({maxEvents: 20000 - c1 - c2}); - + this.fetching = false; } catch (e) { logging.error(e, 'Error while fetching email analytics'); diff --git a/ghost/core/core/server/services/email-analytics/lib/queries.js b/ghost/core/core/server/services/email-analytics/lib/queries.js index fbe2019fdc..f9b619828c 100644 --- a/ghost/core/core/server/services/email-analytics/lib/queries.js +++ b/ghost/core/core/server/services/email-analytics/lib/queries.js @@ -11,30 +11,35 @@ module.exports = { return emailCount && emailCount.count > 0; }, - async getLastSeenEventTimestamp() { + /** + * Retrieves the timestamp of the last seen event for the specified email analytics events. + * @param {string[]} events - The email analytics events to consider (default: ['delivered', 'opened', 'failed']). + * @returns {Promise} The timestamp of the last seen event, or null if no events are found. + */ + async getLastEventTimestamp(events = ['delivered', 'opened', 'failed']) { const startDate = new Date(); - // three separate queries is much faster than using max/greatest (with coalesce to handle nulls) across columns - let {maxDeliveredAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first() || {}; - let {maxOpenedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first() || {}; - let {maxFailedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first() || {}; + // separate queries is much faster than using max/greatest (with coalesce to handle nulls) across columns + let {maxOpenedAt} = events.includes('opened') ? await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first() : null; + let {maxDeliveredAt} = events.includes('delivered') ? await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first() : null; + let {maxFailedAt} = events.includes('failed') ? await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first() : null; + + if (maxOpenedAt && !(maxOpenedAt instanceof Date)) { + // SQLite returns a string instead of a Date + maxOpenedAt = new Date(maxOpenedAt); + } if (maxDeliveredAt && !(maxDeliveredAt instanceof Date)) { // SQLite returns a string instead of a Date maxDeliveredAt = new Date(maxDeliveredAt); } - if (maxOpenedAt && !(maxOpenedAt instanceof Date)) { - // SQLite returns a string instead of a Date - maxOpenedAt = new Date(maxOpenedAt); - } - if (maxFailedAt && !(maxFailedAt instanceof Date)) { // SQLite returns a string instead of a Date maxFailedAt = new Date(maxFailedAt); } - const lastSeenEventTimestamp = _.max([maxDeliveredAt, maxOpenedAt, maxFailedAt]); + const lastSeenEventTimestamp = _.max([maxOpenedAt, maxDeliveredAt, maxFailedAt]); debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`); return lastSeenEventTimestamp; diff --git a/ghost/core/test/integration/services/email-service/email-event-storage.test.js b/ghost/core/test/integration/services/email-service/email-event-storage.test.js index e712f68a83..d88a5455bf 100644 --- a/ghost/core/test/integration/services/email-service/email-event-storage.test.js +++ b/ghost/core/test/integration/services/email-service/email-event-storage.test.js @@ -23,7 +23,7 @@ describe('EmailEventStorage', function () { before(async function () { // Stub queries before boot const queries = require('../../../../core/server/services/email-analytics/lib/queries'); - sinon.stub(queries, 'getLastSeenEventTimestamp').callsFake(async function () { + sinon.stub(queries, 'getLastEventTimestamp').callsFake(async function () { // This is required because otherwise the last event timestamp will be now, and that is too close to NOW to start fetching new events return new Date(2000, 0, 1); }); @@ -78,7 +78,7 @@ describe('EmailEventStorage', function () { // Fire event processing // We use offloading to have correct coverage and usage of worker thread - const result = await emailAnalytics.fetchLatest(); + const result = await emailAnalytics.fetchLatestNonOpenedEvents(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -125,7 +125,7 @@ describe('EmailEventStorage', function () { assert.equal(initialModel.get('delivered_at'), null); // Fire event processing - const result = await emailAnalytics.fetchLatest(); + const result = await emailAnalytics.fetchLatestNonOpenedEvents(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -170,7 +170,7 @@ describe('EmailEventStorage', function () { assert.equal(initialModel.get('opened_at'), null); // Fire event processing - const result = await emailAnalytics.fetchLatest(); + const result = await emailAnalytics.fetchLatestOpenedEvents(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -250,7 +250,7 @@ describe('EmailEventStorage', function () { assert.notEqual(initialModel.get('delivered_at'), null); // Fire event processing - const result = await emailAnalytics.fetchLatest(); + const result = await emailAnalytics.fetchLatestOpenedEvents(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -346,7 +346,7 @@ describe('EmailEventStorage', function () { assert.notEqual(initialModel.get('delivered_at'), null); // Fire event processing - const result = await emailAnalytics.fetchLatest(); + const result = await emailAnalytics.fetchLatestOpenedEvents(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -439,7 +439,7 @@ describe('EmailEventStorage', function () { assert.notEqual(initialModel.get('failed_at'), null, 'This test requires a failed email recipient'); // Fire event processing - const result = await emailAnalytics.fetchLatest(); + const result = await emailAnalytics.fetchLatestOpenedEvents(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -529,7 +529,7 @@ describe('EmailEventStorage', function () { assert.equal(initialModel.get('failed_at'), null); // Fire event processing - const result = await emailAnalytics.fetchLatest(); + const result = await emailAnalytics.fetchLatestOpenedEvents(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -645,7 +645,7 @@ describe('EmailEventStorage', function () { assert.equal(initialModel.get('failed_at'), null); // Fire event processing - const result = await emailAnalytics.fetchLatest(); + const result = await emailAnalytics.fetchLatestOpenedEvents(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -747,7 +747,7 @@ describe('EmailEventStorage', function () { }]; // Fire event processing - const result = await emailAnalytics.fetchLatest(); + const result = await emailAnalytics.fetchLatestOpenedEvents(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -849,7 +849,7 @@ describe('EmailEventStorage', function () { }]; // Fire event processing - const result = await emailAnalytics.fetchLatest(); + const result = await emailAnalytics.fetchLatestOpenedEvents(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -951,7 +951,7 @@ describe('EmailEventStorage', function () { }]; // Fire event processing - const result = await emailAnalytics.fetchLatest(); + const result = await emailAnalytics.fetchLatestOpenedEvents(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -1015,7 +1015,7 @@ describe('EmailEventStorage', function () { }]; // Fire event processing - const result = await emailAnalytics.fetchLatest(); + const result = await emailAnalytics.fetchLatestOpenedEvents(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -1074,7 +1074,7 @@ describe('EmailEventStorage', function () { }]; // Fire event processing - const result = await emailAnalytics.fetchLatest(); + const result = await emailAnalytics.fetchLatestOpenedEvents(); assert.equal(result, 1); // Since this is all event based we should wait for all dispatched events to be completed. @@ -1118,7 +1118,7 @@ describe('EmailEventStorage', function () { }]; // Fire event processing - const result = await emailAnalytics.fetchLatest(); + const result = await emailAnalytics.fetchLatestOpenedEvents(); assert.equal(result, 1); }); @@ -1132,7 +1132,7 @@ describe('EmailEventStorage', function () { }]; // Fire event processing - const result = await emailAnalytics.fetchLatest(); + const result = await emailAnalytics.fetchLatestOpenedEvents(); assert.equal(result, 0); }); }); diff --git a/ghost/core/test/integration/services/mailgun-email-suppression-list.test.js b/ghost/core/test/integration/services/mailgun-email-suppression-list.test.js index 6d6addb6aa..2fa02c76ec 100644 --- a/ghost/core/test/integration/services/mailgun-email-suppression-list.test.js +++ b/ghost/core/test/integration/services/mailgun-email-suppression-list.test.js @@ -44,7 +44,7 @@ describe('MailgunEmailSuppressionList', function () { recipient })]; - await emailAnalytics.fetchLatest(); + await emailAnalytics.fetchLatestOpenedEvents(); await DomainEvents.allSettled(); const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`); @@ -72,7 +72,7 @@ describe('MailgunEmailSuppressionList', function () { recipient })]; - await emailAnalytics.fetchLatest(); + await emailAnalytics.fetchLatestOpenedEvents(); await DomainEvents.allSettled(); const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`); @@ -100,7 +100,7 @@ describe('MailgunEmailSuppressionList', function () { recipient })]; - await emailAnalytics.fetchLatest(); + await emailAnalytics.fetchLatestOpenedEvents(); await DomainEvents.allSettled(); const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`); @@ -128,7 +128,7 @@ describe('MailgunEmailSuppressionList', function () { recipient })]; - await emailAnalytics.fetchLatest(); + await emailAnalytics.fetchLatestOpenedEvents(); await DomainEvents.allSettled(); const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`); @@ -163,7 +163,7 @@ describe('MailgunEmailSuppressionList', function () { timestamp: Math.round(timestamp.getTime() / 1000) }]; - await emailAnalytics.fetchLatest(); + await emailAnalytics.fetchLatestOpenedEvents(); await DomainEvents.allSettled(); const {body: {members: [memberAfter]}} = await agent.get(`/members/${memberId}`); diff --git a/ghost/email-analytics-provider-mailgun/lib/EmailAnalyticsProviderMailgun.js b/ghost/email-analytics-provider-mailgun/lib/EmailAnalyticsProviderMailgun.js index 4bcfacf3a4..cb748b5dbc 100644 --- a/ghost/email-analytics-provider-mailgun/lib/EmailAnalyticsProviderMailgun.js +++ b/ghost/email-analytics-provider-mailgun/lib/EmailAnalyticsProviderMailgun.js @@ -1,6 +1,6 @@ const MailgunClient = require('@tryghost/mailgun-client'); -const EVENT_FILTER = 'delivered OR opened OR failed OR unsubscribed OR complained'; +const DEFAULT_EVENT_FILTER = 'delivered OR opened OR failed OR unsubscribed OR complained'; const PAGE_LIMIT = 300; const DEFAULT_TAGS = ['bulk-email']; @@ -26,11 +26,12 @@ class EmailAnalyticsProviderMailgun { * @param {Number} [options.maxEvents] Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks. * @param {Date} [options.begin] * @param {Date} [options.end] + * @param {String[]} [options.events] */ fetchLatest(batchHandler, options) { const mailgunOptions = { limit: PAGE_LIMIT, - event: EVENT_FILTER, + event: options?.events ? options.events.join(' OR ') : DEFAULT_EVENT_FILTER, tags: this.tags.join(' AND '), begin: options.begin ? options.begin.getTime() / 1000 : undefined, end: options.end ? options.end.getTime() / 1000 : undefined, diff --git a/ghost/email-analytics-provider-mailgun/test/provider-mailgun.test.js b/ghost/email-analytics-provider-mailgun/test/provider-mailgun.test.js index c0cba21531..ca99240ead 100644 --- a/ghost/email-analytics-provider-mailgun/test/provider-mailgun.test.js +++ b/ghost/email-analytics-provider-mailgun/test/provider-mailgun.test.js @@ -155,5 +155,28 @@ describe('EmailAnalyticsProviderMailgun', function () { tags: 'bulk-email AND custom-tag' }, batchHandler, {maxEvents: undefined}); }); + + it('uses provided events when supplied', async function () { + const configStub = sinon.stub(config, 'get'); + configStub.withArgs('bulkEmail').returns({ + mailgun: { + apiKey: 'apiKey', + domain: 'domain.com', + baseUrl: 'https://api.mailgun.net/v3' + } + }); + const mailgunProvider = new EmailAnalyticsProviderMailgun({config, settings}); + + const batchHandler = sinon.spy(); + const mailgunFetchEventsStub = sinon.stub(mailgunProvider.mailgunClient, 'fetchEvents').returns(SAMPLE_EVENTS); + + await mailgunProvider.fetchLatest(batchHandler, {events: ['delivered'], begin: LATEST_TIMESTAMP}); + + sinon.assert.calledWithExactly(mailgunFetchEventsStub, { + ...MAILGUN_OPTIONS, + event: 'delivered', + tags: 'bulk-email' + }, batchHandler, {maxEvents: undefined}); + }); }); }); diff --git a/ghost/email-analytics-service/lib/EmailAnalyticsService.js b/ghost/email-analytics-service/lib/EmailAnalyticsService.js index 32262b6e55..ee43582d0c 100644 --- a/ghost/email-analytics-service/lib/EmailAnalyticsService.js +++ b/ghost/email-analytics-service/lib/EmailAnalyticsService.js @@ -32,13 +32,18 @@ module.exports = class EmailAnalyticsService { /** * @type {FetchData} */ - #fetchLatestData = null; + #fetchLatestNonOpenedData = null; /** * @type {FetchData} */ #fetchMissingData = null; + /** + * @type {FetchData} + */ + #fetchLatestOpenedData = null; + /** * @type {FetchDataScheduled} */ @@ -58,38 +63,64 @@ module.exports = class EmailAnalyticsService { getStatus() { return { - latest: this.#fetchLatestData, + latest: this.#fetchLatestNonOpenedData, missing: this.#fetchMissingData, - scheduled: this.#fetchScheduledData + scheduled: this.#fetchScheduledData, + latestOpened: this.#fetchLatestOpenedData }; } /** * Returns the timestamp of the last event we processed. Defaults to now minus 30 minutes if we have no data yet. */ - async getLastEventTimestamp() { - return this.#fetchLatestData?.lastEventTimestamp ?? (await this.queries.getLastSeenEventTimestamp()) ?? new Date(Date.now() - TRUST_THRESHOLD_MS); + async getLastNonOpenedEventTimestamp() { + return this.#fetchLatestNonOpenedData?.lastEventTimestamp ?? (await this.queries.getLastEventTimestamp(['delivered','failed'])) ?? new Date(Date.now() - TRUST_THRESHOLD_MS); } - async fetchLatest({maxEvents = Infinity} = {}) { + async getLastOpenedEventTimestamp() { + return this.#fetchLatestOpenedData?.lastEventTimestamp ?? (await this.queries.getLastEventTimestamp(['opened'])) ?? new Date(Date.now() - TRUST_THRESHOLD_MS); + } + + async fetchLatestOpenedEvents({maxEvents = Infinity} = {}) { // Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available - const begin = await this.getLastEventTimestamp(); - const end = new Date(Date.now() - FETCH_LATEST_END_MARGIN_MS); // ALways stop at x minutes ago to give Mailgun a bit more time to stabilize storage + const begin = await this.getLastOpenedEventTimestamp(); + const end = new Date(Date.now() - FETCH_LATEST_END_MARGIN_MS); // Always stop at x minutes ago to give Mailgun a bit more time to stabilize storage if (end <= begin) { // Skip for now - logging.info('[EmailAnalytics] Skipping fetchLatest because end (' + end + ') is before begin (' + begin + ')'); + logging.info('[EmailAnalytics] Skipping fetchLatestOpenedEvents because end (' + end + ') is before begin (' + begin + ')'); return 0; } // Create the fetch data object if it doesn't exist yet - if (!this.#fetchLatestData) { - this.#fetchLatestData = { + if (!this.#fetchLatestOpenedData) { + this.#fetchLatestOpenedData = { running: false }; } - return await this.#fetchEvents(this.#fetchLatestData, {begin, end, maxEvents}); + return await this.#fetchEvents(this.#fetchLatestOpenedData, {begin, end, maxEvents, events: ['opened']}); + } + + async fetchLatestNonOpenedEvents({maxEvents = Infinity} = {}) { + // Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available + const begin = await this.getLastNonOpenedEventTimestamp(); + const end = new Date(Date.now() - FETCH_LATEST_END_MARGIN_MS); // Always stop at x minutes ago to give Mailgun a bit more time to stabilize storage + + if (end <= begin) { + // Skip for now + logging.info('[EmailAnalytics] Skipping fetchLatestNonOpenedEvents because end (' + end + ') is before begin (' + begin + ')'); + return 0; + } + + // Create the fetch data object if it doesn't exist yet + if (!this.#fetchLatestNonOpenedData) { + this.#fetchLatestNonOpenedData = { + running: false + }; + } + + return await this.#fetchEvents(this.#fetchLatestNonOpenedData, {begin, end, maxEvents, events: ['delivered', 'failed', 'unsubscribed', 'complained']}); } /** @@ -101,11 +132,11 @@ module.exports = class EmailAnalyticsService { // We start where we left of, or 1,5h ago after a server restart const begin = this.#fetchMissingData?.lastEventTimestamp ?? this.#fetchMissingData?.lastBegin ?? new Date(Date.now() - TRUST_THRESHOLD_MS * 3); - // Always stop at the time the fetchLatest started fetching on, or maximum 30 minutes ago + // Always stop at the earlier of the time the fetchLatest started fetching on or 30 minutes ago const end = new Date( Math.min( Date.now() - TRUST_THRESHOLD_MS, - this.#fetchLatestData?.lastBegin?.getTime() + this.#fetchLatestNonOpenedData?.lastBegin?.getTime() ) ); @@ -200,8 +231,9 @@ module.exports = class EmailAnalyticsService { * @param {Date} options.begin * @param {Date} options.end * @param {number} [options.maxEvents] Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks. + * @param {String[]} [options.eventTypes] Only fetch these events, ['delivered', 'opened', 'failed', 'unsubscribed', 'complained'] */ - async #fetchEvents(fetchData, {begin, end, maxEvents = Infinity}) { + async #fetchEvents(fetchData, {begin, end, maxEvents = Infinity, eventTypes = null}) { // Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available logging.info('[EmailAnalytics] Fetching from ' + begin.toISOString() + ' until ' + end.toISOString() + ' (maxEvents: ' + maxEvents + ')'); @@ -246,7 +278,7 @@ module.exports = class EmailAnalyticsService { try { for (const provider of this.providers) { - await provider.fetchLatest(processBatch, {begin, end, maxEvents}); + await provider.fetchLatest(processBatch, {begin, end, maxEvents, events: eventTypes}); } logging.info('[EmailAnalytics] Fetching finished'); diff --git a/ghost/email-analytics-service/test/email-analytics-service.test.js b/ghost/email-analytics-service/test/email-analytics-service.test.js index 1e59d9ceb2..3926707cc8 100644 --- a/ghost/email-analytics-service/test/email-analytics-service.test.js +++ b/ghost/email-analytics-service/test/email-analytics-service.test.js @@ -10,39 +10,126 @@ const { const EventProcessingResult = require('../lib/EventProcessingResult'); describe('EmailAnalyticsService', function () { - let eventProcessor; - beforeEach(function () { - eventProcessor = {}; - eventProcessor.handleDelivered = sinon.stub().callsFake(({emailId}) => { - return { - emailId, - emailRecipientId: emailId, - memberId: 1 - }; - }); - eventProcessor.handleOpened = sinon.stub().callsFake(({emailId}) => { - return { - emailId, - emailRecipientId: emailId, - memberId: 1 - }; + describe('getStatus', function () { + it('returns status object', function () { + // these are null because we're not running them before calling this + const service = new EmailAnalyticsService({}); + const result = service.getStatus(); + result.should.deepEqual({ + latest: null, + missing: null, + scheduled: null, + latestOpened: null + }); }); }); - describe('fetchLatest', function () { + describe('getLastNonOpenedEventTimestamp', function () { + it('returns the queried timestamp before the fallback', async function () { + const service = new EmailAnalyticsService({ + queries: { + getLastEventTimestamp: sinon.stub().resolves(new Date(1)) + } + }); + const result = await service.getLastNonOpenedEventTimestamp(); + result.should.eql(new Date(1)); + }); + + it('returns the fallback if nothing is found', async function () { + const service = new EmailAnalyticsService({ + queries: { + getLastEventTimestamp: sinon.stub().resolves(null) + } + }); + + const result = await service.getLastNonOpenedEventTimestamp(); + result.should.eql(new Date(Date.now() - 30 * 60 * 1000)); // should be 30 mins prior + }); + }); + + describe('getLastSeenOpenedEventTimestamp', function () { + it('returns the queried timestamp before the fallback', async function () { + const service = new EmailAnalyticsService({ + queries: { + getLastEventTimestamp: sinon.stub().resolves(new Date(1)) + } + }); + + const result = await service.getLastOpenedEventTimestamp(); + result.should.eql(new Date(1)); + }); + + it('returns the fallback if nothing is found', async function () { + const service = new EmailAnalyticsService({ + queries: { + getLastEventTimestamp: sinon.stub().resolves(null) + } + }); + + const result = await service.getLastOpenedEventTimestamp(); + result.should.eql(new Date(Date.now() - 30 * 60 * 1000)); // should be 30 mins prior + }); + + it.skip('returns the cached value before the fallback', async function () { + }); }); describe('processEventBatch', function () { + let eventProcessor; + beforeEach(function () { + eventProcessor = {}; + eventProcessor.handleDelivered = sinon.stub().callsFake(({emailId}) => { + return { + emailId, + emailRecipientId: emailId, + memberId: 1 + }; + }); + eventProcessor.handleOpened = sinon.stub().callsFake(({emailId}) => { + return { + emailId, + emailRecipientId: emailId, + memberId: 1 + }; + }); + eventProcessor.handlePermanentFailed = sinon.stub().callsFake(({emailId}) => { + return { + emailId, + emailRecipientId: emailId, + memberId: 1 + }; + }); + eventProcessor.handleTemporaryFailed = sinon.stub().callsFake(({emailId}) => { + return { + emailId, + emailRecipientId: emailId, + memberId: 1 + }; + }); + eventProcessor.handleUnsubscribed = sinon.stub().callsFake(({emailId}) => { + return { + emailId, + emailRecipientId: emailId, + memberId: 1 + }; + }); + eventProcessor.handleComplained = sinon.stub().callsFake(({emailId}) => { + return { + emailId, + emailRecipientId: emailId, + memberId: 1 + }; + }); + }); + it('uses passed-in event processor', async function () { const service = new EmailAnalyticsService({ eventProcessor }); const result = new EventProcessingResult(); - const fetchData = { - - }; + const fetchData = {}; await service.processEventBatch([{ type: 'delivered', emailId: 1, @@ -58,6 +145,7 @@ describe('EmailAnalyticsService', function () { }], result, fetchData); eventProcessor.handleDelivered.callCount.should.eql(2); + eventProcessor.handleOpened.callCount.should.eql(1); result.should.deepEqual(new EventProcessingResult({ delivered: 2, @@ -71,6 +159,207 @@ describe('EmailAnalyticsService', function () { lastEventTimestamp: new Date(3) }); }); + + it('handles opened', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); + + const result = new EventProcessingResult(); + const fetchData = {}; + + await service.processEventBatch([{ + type: 'opened', + emailId: 1, + timestamp: new Date(1) + }], result, fetchData); + + eventProcessor.handleOpened.calledOnce.should.be.true(); + + result.should.deepEqual(new EventProcessingResult({ + delivered: 0, + opened: 1, + unprocessable: 0, + emailIds: [1], + memberIds: [1] + })); + + fetchData.should.deepEqual({ + lastEventTimestamp: new Date(1) + }); + }); + + it('handles delivered', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); + + const result = new EventProcessingResult(); + const fetchData = {}; + + await service.processEventBatch([{ + type: 'delivered', + emailId: 1, + timestamp: new Date(1) + }], result, fetchData); + + eventProcessor.handleDelivered.calledOnce.should.be.true(); + + result.should.deepEqual(new EventProcessingResult({ + delivered: 1, + opened: 0, + unprocessable: 0, + emailIds: [1], + memberIds: [1] + })); + + fetchData.should.deepEqual({ + lastEventTimestamp: new Date(1) + }); + }); + + it('handles failed (permanent)', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); + + const result = new EventProcessingResult(); + const fetchData = {}; + + await service.processEventBatch([{ + type: 'failed', + severity: 'permanent', + emailId: 1, + timestamp: new Date(1) + }], result, fetchData); + + eventProcessor.handlePermanentFailed.calledOnce.should.be.true(); + + result.should.deepEqual(new EventProcessingResult({ + permanentFailed: 1, + emailIds: [1], + memberIds: [1] + })); + + fetchData.should.deepEqual({ + lastEventTimestamp: new Date(1) + }); + }); + + it('handles failed (temporary)', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); + + const result = new EventProcessingResult(); + const fetchData = {}; + + await service.processEventBatch([{ + type: 'failed', + severity: 'temporary', + emailId: 1, + timestamp: new Date(1) + }], result, fetchData); + + eventProcessor.handleTemporaryFailed.calledOnce.should.be.true(); + + result.should.deepEqual(new EventProcessingResult({ + temporaryFailed: 1, + emailIds: [1], + memberIds: [1] + })); + + fetchData.should.deepEqual({ + lastEventTimestamp: new Date(1) + }); + }); + + it('handles unsubscribed', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); + + const result = new EventProcessingResult(); + const fetchData = {}; + + await service.processEventBatch([{ + type: 'unsubscribed', + emailId: 1, + timestamp: new Date(1) + }], result, fetchData); + + eventProcessor.handleUnsubscribed.calledOnce.should.be.true(); + eventProcessor.handleDelivered.called.should.be.false(); + eventProcessor.handleOpened.called.should.be.false(); + + result.should.deepEqual(new EventProcessingResult({ + unsubscribed: 1, + emailIds: [1], + memberIds: [1] + })); + + fetchData.should.deepEqual({ + lastEventTimestamp: new Date(1) + }); + }); + + it('handles complained', async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); + + const result = new EventProcessingResult(); + const fetchData = {}; + + await service.processEventBatch([{ + type: 'complained', + emailId: 1, + timestamp: new Date(1) + }], result, fetchData); + + eventProcessor.handleComplained.calledOnce.should.be.true(); + eventProcessor.handleDelivered.called.should.be.false(); + eventProcessor.handleOpened.called.should.be.false(); + + result.should.deepEqual(new EventProcessingResult({ + complained: 1, + emailIds: [1], + memberIds: [1] + })); + + fetchData.should.deepEqual({ + lastEventTimestamp: new Date(1) + }); + }); + + it(`doens't handle other event types`, async function () { + const service = new EmailAnalyticsService({ + eventProcessor + }); + + const result = new EventProcessingResult(); + const fetchData = {}; + + await service.processEventBatch([{ + type: 'notstandard', + emailId: 1, + timestamp: new Date(1) + }], result, fetchData); + + eventProcessor.handleDelivered.called.should.be.false(); + eventProcessor.handleOpened.called.should.be.false(); + + result.should.deepEqual(new EventProcessingResult({ + unhandled: 1 + })); + + fetchData.should.deepEqual({ + lastEventTimestamp: new Date(1) + }); + }); + }); + + describe('processEvent', function () { }); describe('aggregateStats', function () { @@ -100,4 +389,34 @@ describe('EmailAnalyticsService', function () { service.queries.aggregateMemberStats.calledWith('m-2').should.be.true(); }); }); + + describe('aggregateEmailStats', function () { + it('returns the query result', async function () { + const service = new EmailAnalyticsService({ + queries: { + aggregateEmailStats: sinon.stub().resolves() + } + }); + + await service.aggregateEmailStats('memberId'); + + service.queries.aggregateEmailStats.calledOnce.should.be.true(); + service.queries.aggregateEmailStats.calledWith('memberId').should.be.true; + }); + }); + + describe('aggregateMemberStats', function () { + it('returns the query result', async function () { + const service = new EmailAnalyticsService({ + queries: { + aggregateMemberStats: sinon.stub().resolves() + } + }); + + await service.aggregateMemberStats('memberId'); + + service.queries.aggregateMemberStats.calledOnce.should.be.true(); + service.queries.aggregateMemberStats.calledWith('memberId').should.be.true; + }); + }); });