From 95044e3ba0fb76c23d7b3db2f3b2c904d7242c15 Mon Sep 17 00:00:00 2001 From: Nazar Gargol Date: Thu, 20 Aug 2020 20:24:05 +1200 Subject: [PATCH] Added error handling for failed member imports no issue - When bulk insert fails there is no transactional logic to revert related records form being inserted. Also, previously there were no attempts to "retry" the insert. - To avoid complex retry logic, an iterative one-by-one insert retry approach was taken. If this becomes a bottleneck in the future, the retry algorithm could be improved. - To avoid a lot of code duplication refactored model's `bulkAdd` & `bulkDestroy` methods to use 'bulk-operations' module. - Updated error handling and logging for bulk delete operations. It's very unlikely for error to happen here, but still need to make sure there is a proper logging in place to trace back the failure. - Added debug logs. This should improve debugging experience and performance measurements. - Added handling for unrecognized errors. Handling inspired by current unrecognized error handling by ghost importer -https://github.com/TryGhost/Ghost/blob/10e5d5f3d470ae201072e41213ecf708051f0a8b/core/server/data/importer/importers/data/base.js#L148-L154 --- .../base}/bulk-operations.js | 24 ++- core/server/models/base/index.js | 13 ++ core/server/models/member-stripe-customer.js | 27 --- core/server/models/member.js | 54 ----- .../models/stripe-customer-subscription.js | 27 --- .../server/services/members/importer/index.js | 189 ++++++++++++++---- core/server/translations/en.json | 12 ++ 7 files changed, 192 insertions(+), 154 deletions(-) rename core/server/{services/members/importer => models/base}/bulk-operations.js (70%) diff --git a/core/server/services/members/importer/bulk-operations.js b/core/server/models/base/bulk-operations.js similarity index 70% rename from core/server/services/members/importer/bulk-operations.js rename to core/server/models/base/bulk-operations.js index 68f5411d02..ad333ba65b 100644 --- a/core/server/services/members/importer/bulk-operations.js +++ b/core/server/models/base/bulk-operations.js @@ -1,5 +1,7 @@ const _ = require('lodash'); -const db = require('../../../data/db'); +const errors = require('@tryghost/errors'); +const db = require('../../data/db'); +const logging = require('../../../shared/logging'); const CHUNK_SIZE = 100; @@ -9,8 +11,9 @@ async function insertChunkSequential(table, chunk, result) { await db.knex(table).insert(record); result.successful += 1; } catch (err) { + err.errorDetails = record; result.errors.push(err); - result.unsuccessfulIds.push(record.id); + result.unsuccessfulRecords.push(record); result.unsuccessful += 1; } } @@ -29,7 +32,7 @@ async function insert(table, data) { const result = { successful: 0, unsuccessful: 0, - unsuccessfulIds: [], + unsuccessfulRecords: [], errors: [] }; @@ -41,13 +44,20 @@ async function insert(table, data) { } async function delChunkSequential(table, chunk, result) { - for (const record of chunk) { + for (const id of chunk) { try { - await db.knex(table).where('id', record).del(); + await db.knex(table).where('id', id).del(); result.successful += 1; } catch (err) { - result.errors.push(err); - result.unsuccessfulIds.push(record); + const importError = new errors.DataImportError({ + message: `Failed to remove entry from ${table}`, + context: `Entry id: ${id}`, + err: err + }); + logging.error(importError); + + result.errors.push(importError); + result.unsuccessfulIds.push(id); result.unsuccessful += 1; } } diff --git a/core/server/models/base/index.js b/core/server/models/base/index.js index 9a448eed0b..1c85ac15d3 100644 --- a/core/server/models/base/index.js +++ b/core/server/models/base/index.js @@ -22,6 +22,7 @@ const security = require('@tryghost/security'); const schema = require('../../data/schema'); const urlUtils = require('../../../shared/url-utils'); const validation = require('../../data/validation'); +const bulkOperations = require('./bulk-operations'); const plugins = require('../plugins'); let ghostBookshelf; let proto; @@ -1028,6 +1029,12 @@ ghostBookshelf.Model = ghostBookshelf.Model.extend({ return model.save(null, options); }, + bulkAdd: function bulkAdd(data, tableName) { + tableName = tableName || this.prototype.tableName; + + return bulkOperations.insert(tableName, data); + }, + /** * ### Destroy * Naive destroy @@ -1051,6 +1058,12 @@ ghostBookshelf.Model = ghostBookshelf.Model.extend({ }); }, + bulkDestroy: function bulkDestroy(data, tableName) { + tableName = tableName || this.prototype.tableName; + + return bulkOperations.del(tableName, data); + }, + /** * ### Generate Slug * Create a string to act as the permalink for an object. diff --git a/core/server/models/member-stripe-customer.js b/core/server/models/member-stripe-customer.js index 761168dba1..70976b2548 100644 --- a/core/server/models/member-stripe-customer.js +++ b/core/server/models/member-stripe-customer.js @@ -1,4 +1,3 @@ -const _ = require('lodash'); const ghostBookshelf = require('./base'); const MemberStripeCustomer = ghostBookshelf.Model.extend({ @@ -29,32 +28,6 @@ const MemberStripeCustomer = ghostBookshelf.Model.extend({ return this.add(data, unfilteredOptions); }, - async bulkAdd(data, unfilteredOptions = {}) { - if (!unfilteredOptions.transacting) { - return ghostBookshelf.transaction((transacting) => { - return this.bulkAdd(data, Object.assign({transacting}, unfilteredOptions)); - }); - } - const result = { - successful: 0, - unsuccessful: 0, - errors: [] - }; - - const CHUNK_SIZE = 100; - - for (const chunk of _.chunk(data, CHUNK_SIZE)) { - try { - await ghostBookshelf.knex(this.prototype.tableName).insert(chunk); - result.successful += chunk.length; - } catch (err) { - result.unsuccessful += chunk.length; - result.errors.push(err); - } - } - return result; - }, - add(data, unfilteredOptions = {}) { if (!unfilteredOptions.transacting) { return ghostBookshelf.transaction((transacting) => { diff --git a/core/server/models/member.js b/core/server/models/member.js index ede7cae1b3..6650c16f9f 100644 --- a/core/server/models/member.js +++ b/core/server/models/member.js @@ -1,8 +1,6 @@ const ghostBookshelf = require('./base'); const uuid = require('uuid'); const _ = require('lodash'); -const {i18n} = require('../lib/common'); -const errors = require('@tryghost/errors'); const {sequence} = require('@tryghost/promise'); const config = require('../../shared/config'); const crypto = require('crypto'); @@ -261,58 +259,6 @@ const Member = ghostBookshelf.Model.extend({ return options; }, - async insertChunkSequential(chunk, result, unfilteredOptions) { - for (const member of chunk) { - try { - await (unfilteredOptions.transacting || ghostBookshelf.knex)(this.prototype.tableName).insert(member); - result.successful += 1; - } catch (err) { - if (err.code === 'ER_DUP_ENTRY') { - result.errors.push(new errors.ValidationError({ - message: i18n.t('errors.models.member.memberAlreadyExists.message'), - context: i18n.t('errors.models.member.memberAlreadyExists.context') - })); - } else { - result.errors.push(err); - } - - result.unsuccessfulIds.push(member.id); - result.unsuccessful += 1; - } - } - }, - - async insertChunk(chunk, result, unfilteredOptions) { - try { - await (unfilteredOptions.transacting || ghostBookshelf.knex)(this.prototype.tableName).insert(chunk); - result.successful += chunk.length; - } catch (err) { - await this.insertChunkSequential(chunk, result, unfilteredOptions); - } - }, - - async bulkAdd(data, unfilteredOptions = {}) { - if (!unfilteredOptions.transacting) { - return ghostBookshelf.transaction((transacting) => { - return this.bulkAdd(data, Object.assign({transacting}, unfilteredOptions)); - }); - } - const result = { - successful: 0, - unsuccessful: 0, - unsuccessfulIds: [], - errors: [] - }; - - const CHUNK_SIZE = 100; - - for (const chunk of _.chunk(data, CHUNK_SIZE)) { - await this.insertChunk(chunk, result, unfilteredOptions); - } - - return result; - }, - add(data, unfilteredOptions = {}) { if (!unfilteredOptions.transacting) { return ghostBookshelf.transaction((transacting) => { diff --git a/core/server/models/stripe-customer-subscription.js b/core/server/models/stripe-customer-subscription.js index 8bb5e5736b..6ad94664e2 100644 --- a/core/server/models/stripe-customer-subscription.js +++ b/core/server/models/stripe-customer-subscription.js @@ -1,4 +1,3 @@ -const _ = require('lodash'); const ghostBookshelf = require('./base'); const CURRENCY_SYMBOLS = { @@ -54,32 +53,6 @@ const StripeCustomerSubscription = ghostBookshelf.Model.extend({ })); } return this.add(data, unfilteredOptions); - }, - - async bulkAdd(data, unfilteredOptions = {}) { - if (!unfilteredOptions.transacting) { - return ghostBookshelf.transaction((transacting) => { - return this.bulkAdd(data, Object.assign({transacting}, unfilteredOptions)); - }); - } - const result = { - successful: 0, - unsuccessful: 0, - errors: [] - }; - - const CHUNK_SIZE = 100; - - for (const chunk of _.chunk(data, CHUNK_SIZE)) { - try { - await ghostBookshelf.knex(this.prototype.tableName).insert(chunk); - result.successful += chunk.length; - } catch (err) { - result.unsuccessful += chunk.length; - result.errors.push(err); - } - } - return result; } }); diff --git a/core/server/services/members/importer/index.js b/core/server/services/members/importer/index.js index 74d950a2da..9bc69177c8 100644 --- a/core/server/services/members/importer/index.js +++ b/core/server/services/members/importer/index.js @@ -3,18 +3,26 @@ const uuid = require('uuid'); const ObjectId = require('bson-objectid'); const moment = require('moment-timezone'); const errors = require('@tryghost/errors'); +const debug = require('ghost-ignition').debug('importer:members'); const membersService = require('../index'); -const bulkOperations = require('./bulk-operations'); const models = require('../../../models'); const {i18n} = require('../../../lib/common'); const logging = require('../../../../shared/logging'); -const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) => { - const createInserter = table => data => bulkOperations.insert(table, data); - const createDeleter = table => data => bulkOperations.del(table, data); +const handleUnrecognizedError = (error) => { + if (!errors.utils.isIgnitionError(error)) { + return new errors.DataImportError({ + message: error.message, + context: error.context, + err: error + }); + } else { + return error; + } +}; - const deleteMembers = createDeleter('members'); - const insertLabelAssociations = createInserter('members_labels'); +const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) => { + debug(`Importing members: ${members.length}, labels: ${allLabelModels.length}, import lables: ${importSetLabels.length}, createdBy: ${createdBy}`); let { invalidMembers, @@ -26,16 +34,34 @@ const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) = // NOTE: member insertion has to happen before the rest of insertions to handle validation // errors - remove failed members from label/stripe sets + debug(`Starting insert of ${membersToInsert.length} members`); const insertedMembers = await models.Member.bulkAdd(membersToInsert).then((insertResult) => { - if (insertResult.unsuccessfulIds.length) { + if (insertResult.unsuccessfulRecords.length) { + const unsuccessfulIds = insertResult.unsuccessfulRecords.map(r => r.id); + labelAssociationsToInsert = labelAssociationsToInsert - .filter(la => !insertResult.unsuccessfulIds.includes(la.member_id)); + .filter(la => !unsuccessfulIds.includes(la.member_id)); stripeCustomersToFetch = stripeCustomersToFetch - .filter(sc => !insertResult.unsuccessfulIds.includes(sc.member_id)); + .filter(sc => !unsuccessfulIds.includes(sc.member_id)); stripeCustomersToCreate = stripeCustomersToCreate - .filter(sc => !insertResult.unsuccessfulIds.includes(sc.member_id)); + .filter(sc => !unsuccessfulIds.includes(sc.member_id)); + } + + debug(`Finished inserting members with ${insertResult.errors.length} errors`); + if (insertResult.errors.length) { + insertResult.errors = insertResult.errors.map((error) => { + if (error.code === 'ER_DUP_ENTRY') { + return new errors.ValidationError({ + message: i18n.t('errors.models.member.memberAlreadyExists.message'), + context: i18n.t('errors.models.member.memberAlreadyExists.context'), + err: error + }); + } else { + return handleUnrecognizedError(error); + } + }); } return insertResult; @@ -43,16 +69,41 @@ const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) = const fetchedStripeCustomersPromise = fetchStripeCustomers(stripeCustomersToFetch); const createdStripeCustomersPromise = createStripeCustomers(stripeCustomersToCreate); - const insertedLabelsPromise = insertLabelAssociations(labelAssociationsToInsert); + + debug(`Starting insert of ${labelAssociationsToInsert.length} label associations`); + const insertedLabelsPromise = models.Base.Model.bulkAdd(labelAssociationsToInsert, 'members_labels') + .then((insertResult) => { + debug(`Finished inserting member label associations with ${insertResult.errors.length} errors`); + return insertResult; + }); const insertedCustomersPromise = Promise.all([ fetchedStripeCustomersPromise, createdStripeCustomersPromise ]).then( ([fetchedStripeCustomers, createdStripeCustomers]) => { - return models.MemberStripeCustomer.bulkAdd( - fetchedStripeCustomers.customersToInsert.concat(createdStripeCustomers.customersToInsert) - ); + const stripeCustomersToInsert = fetchedStripeCustomers.customersToInsert.concat(createdStripeCustomers.customersToInsert); + + debug(`Starting insert of ${stripeCustomersToInsert.length} stripe customers`); + return models.MemberStripeCustomer.bulkAdd(stripeCustomersToInsert).then((insertResult) => { + debug(`Finished inserting stripe customers with ${insertResult.errors.length} errors`); + + if (insertResult.errors.length) { + insertResult.errors = insertResult.errors.map((error) => { + if (error.code === 'ER_DUP_ENTRY') { + return new errors.ValidationError({ + message: i18n.t('errors.models.member_stripe_customer.customerAlreadyExists.message'), + context: i18n.t('errors.models.member_stripe_customer.customerAlreadyExists.context'), + err: error + }); + } else { + return handleUnrecognizedError(error); + } + }); + } + + return insertResult; + }); } ); @@ -61,18 +112,54 @@ const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) = createdStripeCustomersPromise, insertedCustomersPromise ]).then( - ([fetchedStripeCustomers, createdStripeCustomers]) => models.StripeCustomerSubscription.bulkAdd( - fetchedStripeCustomers.subscriptionsToInsert.concat(createdStripeCustomers.subscriptionsToInsert) - ) + ([fetchedStripeCustomers, createdStripeCustomers, insertedCustomersResult]) => { + let subscriptionsToInsert = fetchedStripeCustomers.subscriptionsToInsert.concat(createdStripeCustomers.subscriptionsToInsert); + + if (insertedCustomersResult.unsuccessfulRecords.length) { + const unsuccessfulCustomerIds = insertedCustomersResult.unsuccessfulRecords.map(r => r.customer_id); + subscriptionsToInsert = subscriptionsToInsert.filter(s => !unsuccessfulCustomerIds.includes(s.customer_id)); + } + + debug(`Starting insert of ${subscriptionsToInsert.length} stripe customer subscriptions`); + return models.StripeCustomerSubscription.bulkAdd(subscriptionsToInsert) + .then((insertResult) => { + debug(`Finished inserting stripe customer subscriptions with ${insertResult.errors.length} errors`); + + if (insertResult.errors.length) { + insertResult.errors = insertResult.errors.map((error) => { + if (error.code === 'ER_DUP_ENTRY') { + return new errors.ValidationError({ + message: i18n.t('errors.models.stripe_customer_subscription.subscriptionAlreadyExists.message'), + context: i18n.t('errors.models.stripe_customer_subscription.subscriptionAlreadyExists.context'), + err: error + }); + } else { + return handleUnrecognizedError(error); + } + }); + } + + return insertResult; + }); + } ); const deletedMembersPromise = Promise.all([ fetchedStripeCustomersPromise, - createdStripeCustomersPromise + createdStripeCustomersPromise, + insertedCustomersPromise, + insertedSubscriptionsPromise ]).then( - ([fetchedStripeCustomers, createdStripeCustomers]) => deleteMembers( - fetchedStripeCustomers.membersToDelete.concat(createdStripeCustomers.membersToDelete) - ) + ([fetchedStripeCustomers, createdStripeCustomers, insertedStripeCustomers, insertedStripeSubscriptions]) => { + const memberIds = [ + ...fetchedStripeCustomers.membersToDelete, + ...createdStripeCustomers.membersToDelete, + ...insertedStripeCustomers.unsuccessfulRecords.map(r => r.member_id), + ...insertedStripeSubscriptions.unsuccessfulRecords.map(r => r.member_id) + ]; + + return models.Member.bulkDestroy(memberIds); + } ); // This looks sequential, but at the point insertedCustomersPromise has resolved so have all the others @@ -82,26 +169,33 @@ const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) = const fetchedCustomers = await fetchedStripeCustomersPromise; const insertedLabels = await insertedLabelsPromise; + const allErrors = [ + ...insertedMembers.errors, + ...insertedCustomers.errors, + ...insertedSubscriptions.errors, + ...insertedLabels.errors, + ...fetchedCustomers.errors + ]; + const importedCount = insertedMembers.successful - deletedMembers.successful; + const invalidCount = insertedMembers.unsuccessful + invalidMembers.length + deletedMembers.successful + deletedMembers.unsuccessful; + + debug(`Finished members import with ${importedCount} imported, ${invalidCount} invalid and ${allErrors.length} errors`); + const result = { imported: { - count: insertedMembers.successful - deletedMembers.successful + count: importedCount }, invalid: { - count: insertedMembers.unsuccessful + deletedMembers.unsuccessful + invalidMembers.length, - errors: [ - ...insertedMembers.errors, - ...insertedCustomers.errors, - ...insertedSubscriptions.errors, - ...insertedLabels.errors, - ...fetchedCustomers.errors - ] + count: invalidCount, + errors: allErrors } }; // Allow logging to happen outside of the request cycle process.nextTick(() => { - // @TODO wrap errors with validation errors (or whatever is reasonable) result.invalid.errors.forEach(err => logging.error(err)); + deletedMembers.errors.forEach(err => logging.error(err)); + insertedLabels.errors.forEach(err => logging.error(err)); }); return result; @@ -231,6 +325,7 @@ async function createStripeCustomers(stripeCustomersToCreate) { membersToDelete: [] }; + debug(`Creating Stripe customers for ${stripeCustomersToCreate.length} records`); await Promise.all(stripeCustomersToCreate.map(async function createStripeCustomer(customerToCreate) { try { const customer = await membersService.api.members.createStripeCustomer({ @@ -274,15 +369,22 @@ async function createStripeCustomers(stripeCustomersToCreate) { }); } catch (error) { if (error.message.indexOf('customer') && error.code === 'resource_missing') { - error.message = `Member not imported. ${error.message}`; - error.context = i18n.t('errors.api.members.stripeCustomerNotFound.context'); - error.help = i18n.t('errors.api.members.stripeCustomerNotFound.help'); + result.errors.push(new errors.NotFoundError({ + message: `Member not imported. ${error.message}`, + context: i18n.t('errors.api.members.stripeCustomerNotFound.context'), + help: i18n.t('errors.api.members.stripeCustomerNotFound.help'), + err: error, + errorDetails: JSON.stringify(customerToCreate) + })); + } else { + result.errors.push(handleUnrecognizedError(error)); } - result.errors.push(error); + result.membersToDelete.push(customerToCreate.member_id); } })); + debug(`Finished creating Stripe customers with ${result.errors.length} errors`); return result; } @@ -294,6 +396,8 @@ async function fetchStripeCustomers(stripeCustomersToInsert) { membersToDelete: [] }; + debug(`Fetching Stripe customers for ${stripeCustomersToInsert.length} records`); + await Promise.all(stripeCustomersToInsert.map(async function fetchStripeCustomer(customer) { try { const fetchedCustomer = await membersService.api.members.getStripeCustomer(customer.customer_id, { @@ -336,15 +440,22 @@ async function fetchStripeCustomers(stripeCustomersToInsert) { }); } catch (error) { if (error.message.indexOf('customer') && error.code === 'resource_missing') { - error.message = `Member not imported. ${error.message}`; - error.context = i18n.t('errors.api.members.stripeCustomerNotFound.context'); - error.help = i18n.t('errors.api.members.stripeCustomerNotFound.help'); + result.errors.push(new errors.NotFoundError({ + message: `Member not imported. ${error.message}`, + context: i18n.t('errors.api.members.stripeCustomerNotFound.context'), + help: i18n.t('errors.api.members.stripeCustomerNotFound.help'), + err: error, + errorDetails: JSON.stringify(customer) + })); + } else { + result.errors.push(handleUnrecognizedError(error)); } - result.errors.push(error); + result.membersToDelete.push(customer.member_id); } })); + debug(`Finished fetching Stripe customers with ${result.errors.length} errors`); return result; } diff --git a/core/server/translations/en.json b/core/server/translations/en.json index c847557262..0a611c52f5 100644 --- a/core/server/translations/en.json +++ b/core/server/translations/en.json @@ -253,6 +253,18 @@ "context": "Attempting to add member with existing email address." } }, + "member_stripe_customer": { + "customerAlreadyExists": { + "message": "Stripe customer already exists", + "context": "Attempting to add Stripe customer with existing Stripe customer id" + } + }, + "stripe_customer_subscription": { + "subscriptionAlreadyExists": { + "message": "Stripe customer subscription already exists", + "context": "Attempting to add Stripe customer subscription with existing Stripe customer" + } + }, "base": { "index": { "missingContext": "missing context"