api/src/workers/utils.ts

Summary

Maintainability
F
3 days
Test Coverage
import * as csvParser from 'csv-parser';
import * as dotenv from 'dotenv';
import * as fs from 'fs';
import * as mongoose from 'mongoose';
import * as path from 'path';
import * as readline from 'readline';
import { Writable } from 'stream';
import { checkFieldNames } from '../data/modules/fields/utils';
import utils, {
  createAWS,
  getConfig,
  getS3FileInfo,
  ISendNotification,
  uploadsFolderPath
} from '../data/utils';
import {
  CUSTOMER_SELECT_OPTIONS,
  NOTIFICATION_TYPES
} from '../db/models/definitions/constants';
import { default as ImportHistory } from '../db/models/ImportHistory';
import { debugError, debugWorkers } from '../debuggers';
import CustomWorker from './workerUtil';
import * as streamify from 'stream-array';
import * as os from 'os';
import { graphqlPubsub } from '../pubsub';

const { MONGO_URL = '', ELK_SYNCER } = process.env;

export const connect = () =>
  mongoose.connect(MONGO_URL, { useNewUrlParser: true, useCreateIndex: true });

dotenv.config();

const WORKER_BULK_LIMIT = 300;

const myWorker = new CustomWorker();

export const IMPORT_CONTENT_TYPE = {
  CUSTOMER: 'customer',
  COMPANY: 'company',
  LEAD: 'lead',
  PRODUCT: 'product',
  DEAL: 'deal',
  TASK: 'task',
  TICKET: 'ticket'
};

export const generateUid = () => {
  return (
    '_' +
    Math.random()
      .toString(36)
      .substr(2, 9)
  );
};
export const generatePronoun = value => {
  const pronoun = CUSTOMER_SELECT_OPTIONS.SEX.find(
    sex => sex.label.toUpperCase() === value.toUpperCase()
  );

  return pronoun ? pronoun.value : '';
};

const getCsvInfo = (fileName: string, uploadType: string) => {
  return new Promise(async resolve => {
    if (uploadType === 'local') {
      const readSteam = fs.createReadStream(`${uploadsFolderPath}/${fileName}`);

      let columns;
      let total = 0;

      const rl = readline.createInterface({
        input: readSteam,
        terminal: false
      });

      rl.on('line', input => {
        if (total === 0) {
          columns = input;
        }

        total++;
      });
      rl.on('close', () => {
        // exclude column
        total--;

        debugWorkers(`Get CSV Info type: local, totalRow: ${total}`);

        resolve({ rows: total, columns });
      });
    } else {
      const AWS_BUCKET = await getConfig('AWS_BUCKET');
      const s3 = await createAWS();

      const params = { Bucket: AWS_BUCKET, Key: fileName };

      const rowCountString = await getS3FileInfo({
        s3,
        params,
        query: 'SELECT COUNT(*) FROM S3Object'
      });

      // exclude column
      let total = Number(rowCountString);

      total--;

      const columns = await getS3FileInfo({
        s3,
        params,
        query: 'SELECT * FROM S3Object LIMIT 1'
      });

      debugWorkers(`Get CSV Info type: AWS, totalRow: ${total}`);

      return resolve({ rows: total, columns });
    }
  });
};

const importBulkStream = ({
  contentType,
  fileName,
  bulkLimit,
  uploadType,
  handleBulkOperation,
  associateContentType,
  associateField,
  mainAssociateField
}: {
  contentType: string;
  fileName: string;
  bulkLimit: number;
  uploadType: 'AWS' | 'local';
  handleBulkOperation: (
    rowIndex: number,
    rows: any,
    contentType: string,
    associatedContentType?: string,
    associatedField?: string,
    mainAssociateField?: string
  ) => Promise<void>;
  associateContentType?: string;
  associateField?: string;
  mainAssociateField?: string;
}) => {
  return new Promise(async (resolve, reject) => {
    let rows: any = [];
    let readSteam;
    let rowIndex = 0;

    if (uploadType === 'AWS') {
      const AWS_BUCKET = await getConfig('AWS_BUCKET');

      const s3 = await createAWS();

      const errorCallback = error => {
        throw new Error(error.code);
      };

      const params = { Bucket: AWS_BUCKET, Key: fileName };

      readSteam = s3.getObject(params).createReadStream();
      readSteam.on('error', errorCallback);
    } else {
      readSteam = fs.createReadStream(`${uploadsFolderPath}/${fileName}`);
    }

    const write = (row, _, next) => {
      rowIndex++;
      rows.push(row);

      if (rows.length === bulkLimit) {
        return handleBulkOperation(
          rowIndex,
          rows,
          contentType,
          associateContentType,
          associateField,
          mainAssociateField
        )
          .then(() => {
            rows = [];
            next();
          })
          .catch(e => {
            debugError(`Error during bulk insert from csv: ${e.message}`);
            reject(e);
          });
      }

      return next();
    };

    readSteam
      .pipe(csvParser())
      .pipe(new Writable({ write, objectMode: true }))
      .on('finish', () => {
        rowIndex++;
        handleBulkOperation(
          rowIndex,
          rows,
          contentType,
          associateContentType,
          associateField,
          mainAssociateField
        ).then(() => {
          resolve('success');
        });
      })
      .on('error', e => reject(e));
  });
};

const getWorkerFile = fileName => {
  if (process.env.NODE_ENV !== 'production') {
    return `./src/workers/${fileName}.worker.import.js`;
  }

  if (fs.existsSync('./build/api')) {
    return `./build/api/workers/${fileName}.worker.js`;
  }

  return `./dist/workers/${fileName}.worker.js`;
};

export const clearEmptyValues = (obj: any) => {
  Object.keys(obj).forEach(key => {
    if (obj[key] === '' || obj[key] === 'unknown') {
      delete obj[key];
    }

    if (Array.isArray(obj[key]) && obj[key].length === 0) {
      delete obj[key];
    }
  });

  return obj;
};

export const updateDuplicatedValue = async (
  model: any,
  field: string,
  doc: any
) => {
  return model.updateOne(
    { [field]: doc[field] },
    { $set: { ...doc, modifiedAt: new Date() } }
  );
};

// csv file import, cancel, removal
export const receiveImportRemove = async (content: any) => {
  try {
    debugWorkers(`Remove import called`);

    const { contentType, importHistoryId } = content;

    const handleOnEndWorker = async () => {
      debugWorkers(`Remove import ended`);
    };

    myWorker.setHandleEnd(handleOnEndWorker);

    const importHistory = await ImportHistory.getImportHistory(importHistoryId);

    const ids = importHistory.ids || [];

    if (ids.length === 0) {
      return { status: 'ok' };
    }

    const workerPath = path.resolve(getWorkerFile('importHistoryRemove'));

    const calc = Math.ceil(ids.length / WORKER_BULK_LIMIT);
    const results: any[] = [];

    for (let index = 0; index < calc; index++) {
      const start = index * WORKER_BULK_LIMIT;
      const end = start + WORKER_BULK_LIMIT;
      const row = ids.slice(start, end);

      results.push(row);
    }

    for (const result of results) {
      await myWorker.createWorker(workerPath, {
        contentType,
        importHistoryId,
        result
      });
    }

    return { status: 'ok' };
  } catch (e) {
    debugError(`Failed to remove import: ${e.message}`);
    throw e;
  }
};

export const receiveImportCancel = () => {
  myWorker.removeWorkers();

  return { status: 'ok' };
};

export const receiveImportCreate = async (content: any) => {
  const {
    contentTypes,
    files,
    scopeBrandIds,
    user,
    uploadType,
    columnsConfig,
    importHistoryId,
    associatedContentType,
    associatedField
  } = content;

  const useElkSyncer = ELK_SYNCER === 'false' ? false : true;

  const config: any = {};

  let total = 0;

  let mainType = contentTypes[0];

  if (associatedContentType) {
    mainType = associatedContentType;
  }

  for (const contentType of contentTypes) {
    const file = files[contentType];
    const columnConfig = columnsConfig[contentType];
    const fileName = file[0].url;

    const { rows, columns }: any = await getCsvInfo(fileName, uploadType);

    if (rows === 0) {
      throw new Error('Please import at least one row of data');
    }

    total = total + rows;

    const updatedColumns = (columns || '').replace(/\n|\r/g, '').split(',');

    let properties;
    try {
      properties = await checkFieldNames(
        contentType,
        updatedColumns,
        columnConfig
      );
    } catch (e) {
      debugWorkers('error during check field name ', e.message);

      await ImportHistory.remove({ _id: importHistoryId });

      throw new Error(e.message);
    }

    config[contentType] = { total: rows, properties, fileName };
  }

  await ImportHistory.updateOne(
    { _id: importHistoryId },
    {
      contentTypes,
      userId: user._id,
      date: Date.now(),
      total
    }
  );

  const getAssociatedField = contentType => {
    const properties = config[contentType].properties;

    let property;

    for (const value of properties) {
      if (value.fieldName.trim() === associatedField.trim()) {
        property = value;

        break;
      }
    }

    return property.name;
  };

  const updateImportHistory = async doc => {
    return ImportHistory.updateOne({ _id: importHistoryId }, doc);
  };

  const handleOnEndBulkOperation = async () => {
    const updatedImportHistory = await ImportHistory.findOne({
      _id: importHistoryId
    });

    let status = 'inProgress';

    if (!updatedImportHistory) {
      throw new Error('Import history not found');
    }

    if (
      updatedImportHistory.failed + updatedImportHistory.success ===
      updatedImportHistory.total
    ) {
      status = 'Done';
      await updateImportHistory({
        $set: { status, percentage: 100 }
      });

      const notifDoc: ISendNotification = {
        title: `your ${updatedImportHistory.name} is done`,
        action: ``,
        createdUser: ``,
        receivers: [user._id],
        content: `your ${updatedImportHistory.name} import is done`,
        link: `/settings/importHistories?${mainType}`,
        notifType: NOTIFICATION_TYPES.IMPORT_DONE,
        contentType: 'import',
        contentTypeId: importHistoryId
      };

      await utils.sendNotification(notifDoc);

      graphqlPubsub.publish('importHistoryChanged', {});
    }

    if (associatedContentType && associatedField && status !== 'Done') {
      const contentType = contentTypes.find(value => value !== mainType);

      const mainAssociateField = getAssociatedField(mainType);

      const associateField = getAssociatedField(contentType);

      importBulkStream({
        contentType,
        fileName: config[contentType].fileName,
        uploadType,
        bulkLimit: WORKER_BULK_LIMIT,
        handleBulkOperation,
        associateContentType: associatedContentType,
        associateField,
        mainAssociateField
      });
    }

    debugWorkers(`Import create ended`);
  };

  const handleBulkOperation = async (
    rowIndex: number,
    rows: any,
    contentType: string,
    associateContentType?: string,
    associateField?: string,
    mainAssociateField?: string
  ) => {
    if (rows.length === 0) {
      return debugWorkers('Please import at least one row of data');
    }

    const result: unknown[] = [];

    for (const row of rows) {
      result.push(Object.values(row));
    }

    const workerPath = path.resolve(getWorkerFile('bulkInsert'));

    await myWorker.createWorker(workerPath, {
      rowIndex,
      scopeBrandIds,
      user,
      contentType,
      properties: config[contentType].properties,
      importHistoryId,
      result,
      useElkSyncer,
      percentage: Number(((result.length / total) * 100).toFixed(3)),
      associateContentType,
      associateField,
      mainAssociateField
    });
  };

  myWorker.setHandleEnd(handleOnEndBulkOperation);

  importBulkStream({
    contentType: mainType,
    fileName: config[mainType].fileName,
    uploadType,
    bulkLimit: WORKER_BULK_LIMIT,
    handleBulkOperation
  });

  return { id: importHistoryId };
};

const importWebhookStream = ({
  data,
  bulkLimit,
  handleBulkOperation
}: {
  data: any[];
  bulkLimit: number;
  handleBulkOperation: (rows: any) => Promise<void>;
}) => {
  return new Promise(async (resolve, reject) => {
    let rows: any = [];

    const write = (row, _, next) => {
      rows.push(row);

      if (rows.length === bulkLimit) {
        return handleBulkOperation(rows)
          .then(() => {
            rows = [];
            next();
          })
          .catch(e => {
            debugError(`Error during bulk insert from webhook: ${e.message}`);
            reject(e);
          });
      }

      return next();
    };

    streamify(data, os.EOL)
      .pipe(new Writable({ write, objectMode: true }))
      .on('finish', () => {
        handleBulkOperation(rows).then(() => {
          resolve('success');
        });
      })
      .on('error', e => reject(e));
  });
};

export const importFromWebhook = async (content: any) => {
  const { data, type } = content;

  const useElkSyncer = ELK_SYNCER === 'false' ? false : true;

  if (data.length === 0) {
    throw new Error('Please import at least one row of data');
  }

  const properties = await checkFieldNames(type, Object.keys(data[0]));

  const handleBulkOperation = async (rows: any) => {
    if (rows.length === 0) {
      return debugWorkers('Please import at least one row of data');
    }

    const result: any[] = [];

    for (const row of rows) {
      result.push(Object.values(row));
    }

    const workerPath = path.resolve(getWorkerFile('bulkInsert'));

    await myWorker.createWorker(workerPath, {
      contentType: type,
      properties,
      result,
      useElkSyncer,
      percentage: 0
    });
  };

  importWebhookStream({
    data,
    bulkLimit: WORKER_BULK_LIMIT,
    handleBulkOperation
  });
};