Ghost/core/server/services/email-analytics/email-analytics.js

110 lines
4.2 KiB
JavaScript

const _ = require('lodash');
const EventProcessingResult = require('./lib/event-processing-result');
const EventProcessor = require('./lib/event-processor');
const StatsAggregator = require('./lib/stats-aggregator');
const defaultProviders = require('./providers');
const debug = require('ghost-ignition').debug('services:email-analytics');
// when fetching a batch we should keep a record of which emails have associated
// events so we only aggregate those that are affected
class EmailAnalyticsService {
constructor({config, settings, logging, db, providers, eventProcessor, statsAggregator}) {
this.config = config;
this.settings = settings;
this.logging = logging || console;
this.db = db;
this.providers = providers || defaultProviders.init({config, settings, logging});
this.eventProcessor = eventProcessor || new EventProcessor({db, logging});
this.statsAggregator = statsAggregator || new StatsAggregator({db, logging});
}
async fetchAll() {
const result = new EventProcessingResult();
const emailCount = await this.db.knex('emails').count();
if (emailCount <= 0) {
debug('fetchAll: skipping - no emails to track');
return result;
}
const startFetch = new Date();
debug('fetchAll: starting');
for (const [, provider] of Object.entries(this.providers)) {
const providerResults = await provider.fetchAll(this.processEventBatch.bind(this));
result.merge(providerResults);
}
debug(`fetchAll: finished (${Date.now() - startFetch}ms)`);
return result;
}
async fetchLatest({maxEvents = Infinity} = {}) {
const result = new EventProcessingResult();
const lastTimestamp = await this.getLastSeenEventTimestamp();
const emailCount = await this.db.knex('emails').count();
if (emailCount <= 0) {
debug('fetchLatest: skipping - no emails to track');
return result;
}
const startFetch = new Date();
debug('fetchLatest: starting');
providersLoop:
for (const [, provider] of Object.entries(this.providers)) {
const providerResults = await provider.fetchLatest(lastTimestamp, this.processEventBatch.bind(this), {maxEvents});
result.merge(providerResults);
if (result.totalEvents >= maxEvents) {
break providersLoop;
}
}
debug(`fetchLatest: finished in ${Date.now() - startFetch}ms. Fetched ${result.totalEvents} events`);
return result;
}
async processEventBatch(events) {
const result = new EventProcessingResult();
for (const event of events) {
const batchResult = await this.eventProcessor.process(event);
result.merge(batchResult);
}
return result;
}
async aggregateStats({emailIds = [], memberIds = []}) {
for (const emailId of emailIds) {
await this.aggregateEmailStats(emailId);
}
for (const memberId of memberIds) {
await this.aggregateEmailStats(memberId);
}
}
aggregateEmailStats(emailId) {
return this.statsAggregator.aggregateEmail(emailId);
}
aggregateMemberStats(memberId) {
return this.statsAggregator.aggregateMember(memberId);
}
async getLastSeenEventTimestamp() {
const startDate = new Date();
// three separate queries is much faster than using max/greatest across columns with coalesce to handle nulls
const {maxDeliveredAt} = await this.db.knex('email_recipients').select(this.db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first() || {};
const {maxOpenedAt} = await this.db.knex('email_recipients').select(this.db.knex.raw('MAX(opened_at) as maxOpenedAt')).first() || {};
const {maxFailedAt} = await this.db.knex('email_recipients').select(this.db.knex.raw('MAX(failed_at) as maxFailedAt')).first() || {};
const lastSeenEventTimestamp = _.max([maxDeliveredAt, maxOpenedAt, maxFailedAt]);
debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`);
return lastSeenEventTimestamp;
}
}
module.exports = EmailAnalyticsService;