7bbf644d0d
no issue - added `EmailAnalyticsService` - `.fetchAll()` grabs and processes all available events - `.fetchLatest()` grabs and processes all events since the last seen event timestamp - `EventProcessor` passed event objects and updates `email_recipients` or `members` records depending on the event being analytics or list hygiene - always returns a `EventProcessingResult` instance so that progress can be tracked and merged across individual events, batches (pages of events), and total runs - adds email_id and member_id to the returned result where appropriate so that the stats aggregator can limit processing to data that has changed - sets `email_recipients.{delivered_at, opened_at, failed_at}` for analytics events - sets `members.subscribed = false` for permanent failure/unsubscribed/complained list hygiene events - `StatsAggregator` takes an `EventProcessingResult`-like object containing arrays of email ids and member ids on which to aggregate statistics. - jobs for `fetch-latest` and `fetch-all` ready for use with the JobsService - added `initialiseRecurringJobs()` function to Ghost bootup procedure that schedules the email analytics "fetch latest" job to run every minute
104 lines
4.0 KiB
JavaScript
104 lines
4.0 KiB
JavaScript
const _ = require('lodash');
|
|
const EventProcessingResult = require('./lib/event-processing-result');
|
|
const EventProcessor = require('./lib/event-processor');
|
|
const StatsAggregator = require('./lib/stats-aggregator');
|
|
const defaultProviders = require('./providers');
|
|
const debug = require('ghost-ignition').debug('services:email-analytics');
|
|
|
|
// when fetching a batch we should keep a record of which emails have associated
|
|
// events so we only aggregate those that are affected
|
|
|
|
class EmailAnalyticsService {
|
|
constructor({config, settings, logging, db, providers, eventProcessor, statsAggregator}) {
|
|
this.config = config;
|
|
this.settings = settings;
|
|
this.logging = logging || console;
|
|
this.db = db;
|
|
this.providers = providers || defaultProviders.init({config, settings, logging});
|
|
this.eventProcessor = eventProcessor || new EventProcessor({db, logging});
|
|
this.statsAggregator = statsAggregator || new StatsAggregator({db, logging});
|
|
}
|
|
|
|
async fetchAll() {
|
|
const result = new EventProcessingResult();
|
|
|
|
const emailCount = await this.db.knex('emails').count();
|
|
if (emailCount <= 0) {
|
|
debug('fetchAll: skipping - no emails to track');
|
|
return result;
|
|
}
|
|
|
|
const startFetch = new Date();
|
|
debug('fetchAll: starting');
|
|
for (const [, provider] of Object.entries(this.providers)) {
|
|
const providerResults = await provider.fetchAll(this.processEventBatch.bind(this));
|
|
result.merge(providerResults);
|
|
}
|
|
debug(`fetchAll: finished (${Date.now() - startFetch}ms)`);
|
|
|
|
return result;
|
|
}
|
|
|
|
async fetchLatest({maxEvents = Infinity} = {}) {
|
|
const result = new EventProcessingResult();
|
|
const lastTimestamp = await this.getLastSeenEventTimestamp();
|
|
|
|
const startFetch = new Date();
|
|
debug('fetchLatest: starting');
|
|
providersLoop:
|
|
for (const [, provider] of Object.entries(this.providers)) {
|
|
const providerResults = await provider.fetchLatest(lastTimestamp, this.processEventBatch.bind(this), {maxEvents});
|
|
result.merge(providerResults);
|
|
|
|
if (result.totalEvents >= maxEvents) {
|
|
break providersLoop;
|
|
}
|
|
}
|
|
debug(`fetchLatest: finished in ${Date.now() - startFetch}ms. Fetched ${result.totalEvents} events`);
|
|
|
|
return result;
|
|
}
|
|
|
|
async processEventBatch(events) {
|
|
const result = new EventProcessingResult();
|
|
|
|
for (const event of events) {
|
|
const batchResult = await this.eventProcessor.process(event);
|
|
result.merge(batchResult);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
async aggregateStats({emailIds = [], memberIds = []}) {
|
|
for (const emailId of emailIds) {
|
|
await this.aggregateEmailStats(emailId);
|
|
}
|
|
for (const memberId of memberIds) {
|
|
await this.aggregateEmailStats(memberId);
|
|
}
|
|
}
|
|
|
|
aggregateEmailStats(emailId) {
|
|
return this.statsAggregator.aggregateEmail(emailId);
|
|
}
|
|
|
|
aggregateMemberStats(memberId) {
|
|
return this.statsAggregator.aggregateMember(memberId);
|
|
}
|
|
|
|
async getLastSeenEventTimestamp() {
|
|
const startDate = new Date();
|
|
// three separate queries is much faster than using max/greatest across columns with coalesce to handle nulls
|
|
const {maxDeliveredAt} = await this.db.knex('email_recipients').select(this.db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first() || {};
|
|
const {maxOpenedAt} = await this.db.knex('email_recipients').select(this.db.knex.raw('MAX(opened_at) as maxOpenedAt')).first() || {};
|
|
const {maxFailedAt} = await this.db.knex('email_recipients').select(this.db.knex.raw('MAX(failed_at) as maxFailedAt')).first() || {};
|
|
const lastSeenEventTimestamp = _.max([maxDeliveredAt, maxOpenedAt, maxFailedAt]);
|
|
debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`);
|
|
|
|
return lastSeenEventTimestamp;
|
|
}
|
|
}
|
|
|
|
module.exports = EmailAnalyticsService;
|