api/src/workers/bulkInsert.worker.ts
import * as mongoose from 'mongoose';
import {
Boards,
Companies,
Conformities,
Customers,
Deals,
Fields,
ImportHistory,
Pipelines,
ProductCategories,
Products,
Stages,
Tags,
Tasks,
Tickets,
Users
} from '../db/models';
import { fillSearchTextItem } from '../db/models/boardUtils';
import { IConformityAdd } from '../db/models/definitions/conformities';
import { IUserDocument } from '../db/models/definitions/users';
import { debugWorkers } from '../debuggers';
import { fetchElk } from '../elasticsearch';
import {
clearEmptyValues,
connect,
generatePronoun,
IMPORT_CONTENT_TYPE
} from './utils';
import * as _ from 'underscore';
// tslint:disable-next-line
const { parentPort, workerData } = require('worker_threads');
let cancel = false;
parentPort.once('message', message => {
if (message === 'cancel') {
parentPort.postMessage('Cancelled');
cancel = true;
}
});
const create = async ({
docs,
user,
contentType,
model,
useElkSyncer,
associateContentType,
associateField,
mainAssociateField
}: {
docs: any;
user: IUserDocument;
contentType: string;
model: any;
useElkSyncer: boolean;
associateContentType?: string;
associateField?: string;
mainAssociateField?: string;
}) => {
const {
PRODUCT,
CUSTOMER,
COMPANY,
DEAL,
TASK,
TICKET,
LEAD
} = IMPORT_CONTENT_TYPE;
let objects;
let updated: number = 0;
const conformityCompanyMapping = {};
const conformityCustomerMapping = {};
const associateMapping = {};
const updateDocs: any = [];
let insertDocs: any = [];
const bulkValues: {
primaryEmail: string[];
primaryPhone: string[];
primaryName: string[];
code: string[];
} = {
primaryEmail: [],
primaryPhone: [],
primaryName: [],
code: []
};
const docIdsByPrimaryEmail = {};
const docIdsByPrimaryPhone = {};
const docIdsByPrimaryName = {};
const docIdsByCode = {};
const customFieldsByPrimaryEmail = {};
const customFieldsByPrimaryPhone = {};
const customFieldsByPrimaryName = {};
const customFieldsByCode = {};
let associatedModel: any = null;
if (associateContentType && associateField && mainAssociateField) {
switch (associateContentType) {
case 'customer':
associatedModel = Customers;
case 'lead':
associatedModel = Customers;
break;
case 'company':
associatedModel = Companies;
break;
case 'deal':
associatedModel = Deals;
break;
case 'task':
associatedModel = Tasks;
break;
case 'ticket':
associatedModel = Tickets;
break;
default:
break;
}
}
const generateUpdateDocs = async (
_id,
doc,
prevCustomFieldsData: any = []
) => {
let customFieldsData: Array<{ field: string; value: string }> = [];
updated++;
if (
doc.customFieldsData &&
doc.customFieldsData.length > 0 &&
prevCustomFieldsData.length > 0
) {
doc.customFieldsData.map(data => {
customFieldsData.push({ field: data.field, value: data.value });
});
prevCustomFieldsData.map(data => {
customFieldsData.push({ field: data.field, value: data.value });
});
customFieldsData = _.uniq(customFieldsData, 'field');
doc.customFieldsData = await Fields.prepareCustomFieldsData(
customFieldsData
);
}
updateDocs.push({
updateOne: {
filter: { _id },
update: {
$set: { ...clearEmptyValues(doc), modifiedAt: new Date() }
}
}
});
};
const prepareDocs = async (body, type, collectionDocs) => {
debugWorkers(`prepareDocs called`);
const response = await fetchElk({
action: 'search',
index: type,
body: {
query: { bool: { should: body } },
_source: [
'_id',
'primaryEmail',
'primaryPhone',
'primaryName',
'code',
'customFieldsData'
]
}
});
const collections = (response && response.hits.hits) || [];
for (const collection of collections) {
const doc = collection._source;
if (doc.primaryEmail) {
docIdsByPrimaryEmail[doc.primaryEmail] = collection._id;
customFieldsByPrimaryEmail[doc.primaryEmail] =
doc.customFieldsData || [];
continue;
}
if (doc.primaryPhone) {
docIdsByPrimaryPhone[doc.primaryPhone] = collection._id;
customFieldsByPrimaryPhone[doc.docIdsByPrimaryPhone] =
doc.customFieldsData || [];
continue;
}
if (doc.primaryName) {
docIdsByPrimaryName[doc.primaryName] = collection._id;
customFieldsByPrimaryName[doc.primaryName] = doc.customFieldsData || [];
continue;
}
if (doc.code) {
docIdsByCode[doc.code] = collection._id;
customFieldsByCode[doc.code] = doc.customFieldsData || [];
continue;
}
}
for (const doc of collectionDocs) {
if (doc.primaryEmail && docIdsByPrimaryEmail[doc.primaryEmail]) {
await generateUpdateDocs(
docIdsByPrimaryEmail[doc.primaryEmail],
doc,
customFieldsByPrimaryEmail[doc.primaryEmail]
);
continue;
}
if (doc.primaryPhone && docIdsByPrimaryPhone[doc.primaryPhone]) {
await generateUpdateDocs(
docIdsByPrimaryPhone[doc.primaryPhone],
doc,
customFieldsByPrimaryPhone[doc.primaryPhone]
);
continue;
}
if (doc.primaryName && docIdsByPrimaryName[doc.primaryName]) {
await generateUpdateDocs(
docIdsByPrimaryName[doc.primaryName],
doc,
customFieldsByPrimaryName[doc.customFieldsByPrimaryName]
);
continue;
}
if (doc.code && docIdsByCode[doc.code]) {
await generateUpdateDocs(
docIdsByCode[doc.code],
doc,
customFieldsByCode[doc.code]
);
continue;
}
insertDocs.push(doc);
}
};
const createConformityMapping = async ({
index,
field,
values,
conformityTypeModel,
relType,
isAssociated
}: {
index: number;
field: string;
values: string[];
conformityTypeModel: any;
relType: string;
isAssociated: boolean;
}) => {
if (values.length === 0 && contentType !== relType) {
return;
}
let mapping =
relType === 'customer'
? conformityCustomerMapping
: conformityCompanyMapping;
if (isAssociated) {
mapping = associateMapping;
}
const ids = await conformityTypeModel
.find({ [field]: { $in: values } })
.distinct('_id');
if (ids.length === 0) {
return;
}
for (const id of ids) {
if (!mapping[index]) {
mapping[index] = [];
}
mapping[index].push({
relType,
mainType: contentType === 'lead' ? 'customer' : contentType,
relTypeId: id
});
}
};
if (contentType === CUSTOMER || contentType === LEAD) {
debugWorkers('Worker: Import customer data');
debugWorkers(`useElkSyncer: ${useElkSyncer}`);
for (const doc of docs) {
if (!doc.ownerId && user) {
doc.ownerId = user._id;
}
if (doc.primaryEmail && !doc.emails) {
doc.emails = [doc.primaryEmail];
}
if (doc.primaryPhone && !doc.phones) {
doc.phones = [doc.primaryPhone];
}
// clean custom field values
doc.customFieldsData = await Fields.prepareCustomFieldsData(
doc.customFieldsData
);
if (doc.integrationId) {
doc.relatedIntegrationIds = [doc.integrationId];
}
const { profileScore, searchText, state } = await Customers.calcPSS(doc);
doc.profileScore = profileScore;
doc.searchText = searchText;
doc.state = state;
doc.createdAt = new Date();
doc.modifiedAt = new Date();
bulkValues.primaryEmail.push(doc.primaryEmail);
bulkValues.primaryPhone.push(doc.primaryPhone);
bulkValues.code.push(doc.code);
}
if (useElkSyncer) {
bulkValues.primaryEmail = bulkValues.primaryEmail.filter(value => value);
bulkValues.primaryPhone = bulkValues.primaryPhone.filter(value => value);
bulkValues.code = bulkValues.code.filter(value => value);
const queries: Array<{ terms: { [key: string]: string[] } }> = [];
if (bulkValues.primaryEmail.length > 0) {
queries.push({ terms: { primaryEmail: bulkValues.primaryEmail } });
}
if (bulkValues.primaryPhone.length > 0) {
queries.push({
terms: { 'primaryPhone.raw': bulkValues.primaryPhone }
});
}
if (bulkValues.code.length > 0) {
queries.push({ terms: { 'code.raw': bulkValues.code } });
}
await prepareDocs(queries, 'customers', docs);
} else {
insertDocs = docs;
}
debugWorkers(`Insert doc length: ${insertDocs.length}`);
insertDocs.map(async (doc, docIndex) => {
await createConformityMapping({
index: docIndex,
field: 'primaryName',
values: doc.companiesPrimaryNames || [],
conformityTypeModel: Companies,
relType: 'company',
isAssociated: false
});
if (associateContentType && associateField && mainAssociateField) {
await createConformityMapping({
index: docIndex,
field: mainAssociateField,
values: doc[associateField] || [],
conformityTypeModel: associatedModel,
relType: associateContentType,
isAssociated: true
});
}
});
debugWorkers(`Update doc length: ${updateDocs.length}`);
if (updateDocs.length > 0) {
await Customers.bulkWrite(updateDocs);
}
objects = await Customers.insertMany(insertDocs);
}
if (contentType === COMPANY) {
for (const doc of docs) {
if (!doc.ownerId && user) {
doc.ownerId = user._id;
}
// clean custom field values
doc.customFieldsData = await Fields.prepareCustomFieldsData(
doc.customFieldsData
);
doc.searchText = Companies.fillSearchText(doc);
doc.createdAt = new Date();
doc.modifiedAt = new Date();
bulkValues.primaryName.push(doc.primaryName);
bulkValues.primaryEmail.push(doc.primaryEmail);
bulkValues.primaryPhone.push(doc.primaryPhone);
bulkValues.code.push(doc.code);
}
if (useElkSyncer) {
bulkValues.primaryName = bulkValues.primaryName.filter(value => value);
bulkValues.primaryEmail = bulkValues.primaryEmail.filter(value => value);
bulkValues.primaryPhone = bulkValues.primaryPhone.filter(value => value);
bulkValues.code = bulkValues.code.filter(value => value);
const queries: Array<{ terms: { [key: string]: string[] } }> = [];
if (bulkValues.primaryName.length > 0) {
queries.push({ terms: { 'primaryName.raw': bulkValues.primaryName } });
}
if (bulkValues.primaryEmail.length > 0) {
queries.push({ terms: { primaryEmail: bulkValues.primaryEmail } });
}
if (bulkValues.primaryPhone.length > 0) {
queries.push({
terms: { 'primaryPhone.raw': bulkValues.primaryPhone }
});
}
if (bulkValues.code.length > 0) {
queries.push({ terms: { 'code.raw': bulkValues.code } });
}
await prepareDocs(queries, 'companies', docs);
} else {
insertDocs = docs;
}
insertDocs.map(async (doc, docIndex) => {
await createConformityMapping({
index: docIndex,
field: 'primaryEmail',
values: doc.customersPrimaryEmails || [],
conformityTypeModel: Customers,
relType: 'customer',
isAssociated: false
});
if (associateContentType && associateField && mainAssociateField) {
await createConformityMapping({
index: docIndex,
field: mainAssociateField,
values: doc[associateField] || [],
conformityTypeModel: associatedModel,
relType: associateContentType,
isAssociated: true
});
}
});
if (updateDocs.length > 0) {
await Companies.bulkWrite(updateDocs);
}
objects = await Companies.insertMany(insertDocs);
}
if (contentType === PRODUCT) {
const categoryCodes = docs.map(doc => doc.categoryCode);
const categories = await ProductCategories.find(
{ code: { $in: categoryCodes } },
{ _id: 1, code: 1 }
);
const vendorCodes = docs.map(doc => doc.vendorCode);
const vendors = await Companies.find(
{
$or: [
{ code: { $in: vendorCodes } },
{ primaryEmail: { $in: vendorCodes } },
{ primaryPhone: { $in: vendorCodes } },
{ primaryName: { $in: vendorCodes } }
]
},
{ _id: 1, code: 1, primaryEmail: 1, primaryPhone: 1, primaryName: 1 }
);
if (!categories) {
throw new Error(
'Product & service category not found check categoryCode field'
);
}
for (const doc of docs) {
const category = categories.find(cat => cat.code === doc.categoryCode);
if (category) {
doc.categoryId = category._id;
} else {
throw new Error(
'Product & service category not found check categoryCode field'
);
}
if (doc.vendorCode) {
const vendor = vendors.find(
v =>
v.code === doc.vendorCode ||
v.primaryName === doc.vendorCode ||
v.primaryEmail === doc.vendorCode ||
v.primaryPhone === doc.vendorCode
);
if (vendor) {
doc.vendorId = vendor._id;
} else {
throw new Error(
'Product & service vendor not found check VendorCode field'
);
}
}
doc.unitPrice = parseFloat(
doc.unitPrice ? doc.unitPrice.replace(/,/g, '') : 0
);
doc.customFieldsData = await Fields.prepareCustomFieldsData(
doc.customFieldsData
);
}
objects = await Products.insertMany(docs);
}
if ([DEAL, TASK, TICKET].includes(contentType)) {
const conversationIds = docs
.map(doc => doc.sourceConversationId)
.filter(item => item);
const conversations = await model.find({
sourceConversationId: { $in: conversationIds }
});
if (conversations && conversations.length > 0) {
throw new Error(`Already converted a ${contentType}`);
}
docs.map(async (doc, docIndex) => {
doc.createdAt = new Date();
doc.modifiedAt = new Date();
doc.searchText = fillSearchTextItem(doc);
await createConformityMapping({
index: docIndex,
field: 'primaryEmail',
values: doc.customersPrimaryEmails || [],
conformityTypeModel: Customers,
relType: 'customer',
isAssociated: false
});
await createConformityMapping({
index: docIndex,
field: 'primaryName',
values: doc.companiesPrimaryNames || [],
conformityTypeModel: Companies,
relType: 'company',
isAssociated: false
});
if (associateContentType && associateField && mainAssociateField) {
await createConformityMapping({
index: docIndex,
field: mainAssociateField,
values: doc[associateField] || [],
conformityTypeModel: associatedModel,
relType: associateContentType,
isAssociated: true
});
}
});
objects = await model.insertMany(docs);
}
// create conformity
if (contentType !== PRODUCT) {
const createConformity = async mapping => {
if (Object.keys(mapping).length === 0) {
return;
}
const conformityDocs: IConformityAdd[] = [];
objects.map(async (object, objectIndex) => {
const items = mapping[objectIndex] || [];
if (items.length === 0) {
return;
}
for (const item of items) {
item.mainTypeId = object._id;
}
conformityDocs.push(...items);
});
await Conformities.insertMany(conformityDocs);
};
await createConformity(associateMapping);
}
return { objects, updated };
};
connect().then(async () => {
if (cancel) {
return;
}
debugWorkers(`Worker message received`);
const {
user,
scopeBrandIds,
result,
contentType,
properties,
importHistoryId,
useElkSyncer,
percentage,
associateContentType,
associateField,
mainAssociateField,
rowIndex
}: {
user: IUserDocument;
scopeBrandIds: string[];
result: any;
contentType: string;
properties: Array<{ [key: string]: string }>;
importHistoryId: string;
percentage: number;
useElkSyncer: boolean;
associateContentType?: string;
associateField?: string;
mainAssociateField?: string;
rowIndex?: number;
} = workerData;
let model: any = null;
const isBoardItem = (): boolean =>
contentType === 'deal' ||
contentType === 'task' ||
contentType === 'ticket';
switch (contentType || associateContentType) {
case 'customer':
case 'lead':
model = Customers;
break;
case 'company':
model = Companies;
break;
case 'deal':
model = Deals;
break;
case 'task':
model = Tasks;
break;
case 'ticket':
model = Tickets;
break;
default:
break;
}
if (!Object.values(IMPORT_CONTENT_TYPE).includes(contentType)) {
throw new Error(`Unsupported content type "${contentType}"`);
}
const bulkDoc: any = [];
// Iterating field values
for (const fieldValue of result) {
const doc: any = {
scopeBrandIds,
customFieldsData: []
};
let colIndex: number = 0;
let boardName: string = '';
let pipelineName: string = '';
let stageName: string = '';
// Iterating through detailed properties
for (const property of properties) {
const value = (fieldValue[colIndex] || '').toString();
if (contentType === 'customer') {
doc.state = 'customer';
}
if (contentType === 'lead') {
doc.state = 'lead';
}
switch (property.type) {
case 'customProperty':
{
doc.customFieldsData.push({
field: property.id,
value: fieldValue[colIndex]
});
}
break;
case 'customData':
{
doc[property.name] = value;
}
break;
case 'ownerEmail':
{
const userEmail = value;
const owner = await Users.findOne({ email: userEmail }).lean();
doc[property.name] = owner ? owner._id : '';
}
break;
case 'pronoun':
{
doc.sex = generatePronoun(value);
}
break;
case 'companiesPrimaryNames':
{
doc.companiesPrimaryNames = value.split(',');
}
break;
case 'customersPrimaryEmails':
doc.customersPrimaryEmails = value.split(',');
break;
case 'boardName':
boardName = value;
break;
case 'pipelineName':
pipelineName = value;
break;
case 'stageName':
stageName = value;
break;
case 'categoryCode':
doc.categoryCode = value;
break;
case 'vendorCode':
doc.vendorCode = value;
break;
case 'tag':
{
const tagName = value;
let tag = await Tags.findOne({
name: new RegExp(`.*${tagName}.*`, 'i')
}).lean();
if (!tag) {
const type = contentType === 'lead' ? 'customer' : contentType;
tag = await Tags.createTag({ name: tagName, type });
}
doc[property.name] = tag ? [tag._id] : [];
}
break;
case 'assignedUserEmail':
{
const assignedUser = await Users.findOne({ email: value });
doc[property.name] = assignedUser ? [assignedUser._id] : [];
}
break;
case 'basic':
{
doc[property.name] = value;
if (property.name === 'primaryName' && value) {
doc.names = [value];
}
if (property.name === 'primaryEmail' && value) {
doc.emails = [value];
}
if (property.name === 'primaryPhone' && value) {
doc.phones = [value];
}
if (property.name === 'phones' && value) {
doc.phones = value.split(',');
}
if (property.name === 'emails' && value) {
doc.emails = value.split(',');
}
if (property.name === 'names' && value) {
doc.names = value.split(',');
}
if (property.name === 'isComplete') {
doc.isComplete = Boolean(value);
}
}
break;
} // end property.type switch
colIndex++;
} // end properties for loop
if (
(contentType === 'customer' || contentType === 'lead') &&
!doc.emailValidationStatus
) {
doc.emailValidationStatus = 'unknown';
}
if (
(contentType === 'customer' || contentType === 'lead') &&
!doc.phoneValidationStatus
) {
doc.phoneValidationStatus = 'unknown';
}
// set board item created user
if (isBoardItem()) {
doc.userId = user._id;
if (boardName && pipelineName && stageName) {
const board = await Boards.findOne({
name: boardName,
type: contentType
});
const pipeline = await Pipelines.findOne({
boardId: board && board._id,
name: pipelineName
});
const stage = await Stages.findOne({
pipelineId: pipeline && pipeline._id,
name: stageName
});
doc.stageId = stage && stage._id;
}
}
bulkDoc.push(doc);
}
const modifier: { $inc?; $push? } = {
$inc: { percentage }
};
try {
const { updated, objects } = await create({
docs: bulkDoc,
user,
contentType,
model,
useElkSyncer,
associateContentType,
associateField,
mainAssociateField
});
const cocIds = objects.map(obj => obj._id).filter(obj => obj);
modifier.$push = { ids: cocIds };
modifier.$inc.updated = updated;
modifier.$inc.success = bulkDoc.length;
} catch (e) {
let startRow = 1;
let endRow = bulkDoc.length;
if (rowIndex && rowIndex > 1) {
startRow = rowIndex * bulkDoc.length - bulkDoc.length;
endRow = rowIndex * bulkDoc.length;
}
const distance = endRow - startRow;
if (distance === 1) {
endRow = startRow;
}
debugWorkers(startRow, endRow, e.message, contentType);
// modifier.$push = { errorMsgs: e.message };
modifier.$inc.failed = bulkDoc.length;
modifier.$push = {
errorMsgs: {
startRow,
endRow,
errorMsgs: e.message,
contentType
}
};
}
await ImportHistory.updateOne({ _id: importHistoryId }, modifier);
mongoose.connection.close();
debugWorkers(`Worker done`);
parentPort.postMessage({
action: 'remove',
message: 'Successfully finished the job'
});
});