bcgov/common-hosted-email-service

View on GitHub
app/src/services/dataSvc.js

Summary

Maintainability
B
6 hrs
Test Coverage
/**
 * @module DataService
 *
 * Service to persist data.
 *
 * @see DataConnection
 * @see knex
 * @see Objection
 *
 * @exports DataService
 */
const uuid = require('uuid');

const log = require('../components/log')(module.filename);
const { queueToStatus, statusState } = require('../components/state');
const utils = require('../components/utils');

const DataConnection = require('./dataConn');
const { Message, Queue, Status, Trxn } = require('./models/');

/**
 * @function createMessage
 * inserts all message related data into the db for a Trxn (transaction)
 * inserts an initial status and queue record.
 *
 * @param {string} transactionId - transaction parent record
 * @param {object} msg - API email message object
 * @param {object} trx - an Objection/Knex transaction for commit/rollback
 * @returns Message
 */
const createMessage = async (transactionId, msg, trx) => {
  const messageObj = await Message.query(trx).insert({
    messageId: uuid.v4(),
    transactionId: transactionId,
    tag: msg.tag,
    email: msg,
    delayTimestamp: msg.delayTS
  });

  // Insert initial status and queue accepted records
  await Status.query(trx).insert({
    messageId: messageObj.messageId
  });
  await Queue.query(trx).insert({
    messageId: messageObj.messageId
  });

  return messageObj;
};

/**
 * @function getClientTrxnQuery
 * @description Gets a list of all transactions created by `client`
 *
 * @param {string} client - the client
 * @returns An array of transactionIds
 */
const getClientTrxnQuery = client => {
  return Trxn.query()
    .select('transactionId')
    .where('client', client);
};

class DataService {

  /**
   * Creates a new DataService with default connection.
   * @class
   */
  constructor() {
    this.connection = new DataConnection();
  }

  /**
   * @function connection
   * Gets the current DataConnection
   */
  get connection() {
    return this._connection;
  }

  /**
   * @function connection
   * Sets the current DataConnection
   * @param {object} v - an DataConnection
   */
  set connection(v) {
    this._connection = v;
  }

  /**
   * @function createTransaction
   * Creates a Trxn (transaction) record with associated messages
   *
   * @param {string} client - the authorized party / client
   * @param {object} msg - the API email message or template.
   * @returns {object} Trxn object, fully populated with child messages and status
   */
  async createTransaction(client, msg) {
    if (!msg) {
      throw Error('Transaction cannot be created with email message(s)');
    }
    let trx;
    try {
      trx = await Trxn.startTransaction();
      const transactionId = uuid.v4();

      await Trxn.query(trx).insert({
        transactionId: transactionId,
        client: client
      });

      if (Array.isArray(msg)) {
        await Promise.all(msg.map(m => {
          return createMessage(transactionId, m, trx);
        }));
      } else {
        await createMessage(transactionId, msg, trx);
      }

      await trx.commit();
      return this.readTransaction(client, transactionId);
    } catch (err) {
      log.error(`Error creating transaction record: ${err.message}. Rolling back...`, { error: err, function: 'createTransaction' });
      if (trx) await trx.rollback();
      throw err;
    }
  }

  /**
   * @function deleteMessageEmail
   * Deletes the email data from a message.
   * We don't want to retain any private-ish data longer than required to perform our task.
   *
   * @param {string} client- the authorized party / client
   * @param {string} messageId - the id of the message we want to purge email
   * @throws NotFoundError if message for client not found
   * @returns {object} Message object, fully populated.
   */
  async deleteMessageEmail(client, messageId) {
    let trx;
    try {
      trx = await Message.startTransaction();

      // first query for message, throw not found if client/message not exist...
      await this.readMessage(client, messageId, trx);

      const cItems = await Message.query(trx)
        .patch({ email: null })
        .where('messageId', messageId);
      log.info(`Updated ${cItems} message email records...`, { function: 'deleteMessageEmail' });

      await trx.commit();
      return this.readMessage(client, messageId);
    } catch (err) {
      log.error('DataService.deleteMessageEmail', `Error updating message (email) record: ${err.message}. Rolling back...`, { error: err, function: 'deleteMessageEmail' });
      if (trx) await trx.rollback();
      throw err;
    }
  }

  /**
   * @function findMessagesByQuery
   * @description Finds the set of messages that matches the search criteria
   *
   * @param {string} client - the authorized party / client
   * @param {string} messageId - the id of the desired message
   * @param {string} status - the desired status of the messages
   * @param {string} tag - the desired tag of the messages
   * @param {string} transactionId - the id of the desired transaction
   * @throws NotFoundError if no messages were found
   * @returns {object[]} Array of Message objects
   */
  async findMessagesByQuery(client, messageId, status, tag, transactionId) {
    const parameters = utils.dropUndefinedObject({
      messageId: messageId,
      status: status,
      tag: tag,
      transactionId: transactionId
    });

    return Message.query()
      .whereIn('transactionId', getClientTrxnQuery(client))
      .andWhere(parameters)
      .throwIfNotFound();
  }

  /**
   * @function messageExists
   * Determines if a Message exists
   *
   * @param {string} client - the authorized party / client
   * @param {string} messageId - the id of the message we want
   * @returns {boolean} True if `messageId` for `client` exists
   */
  async messageExists(client, messageId) {
    const msg = await Message.query()
      .findById(messageId)
      .whereIn('transactionId', getClientTrxnQuery(client));

    return !!msg;
  }

  /**
   * @function readMessage
   * Read a Message from the db
   *
   * @param {string} client - the authorized party / client
   * @param {string} messageId - the id of the message we want
   * @param {object} [trx] - optional transaction wrapper
   * @throws NotFoundError if message for client not found
   * @returns {object} Message object, fully populated.
   */
  async readMessage(client, messageId, trx = undefined) {
    return Message.query(trx)
      .findById(messageId)
      .whereIn('transactionId', getClientTrxnQuery(client))
      .withGraphJoined('statusHistory')
      .modifyGraph('statusHistory', builder => {
        builder.orderBy('createdAt', 'desc');
      })
      .throwIfNotFound();
  }

  /**
   * @function readTransaction
   * Read a Transaction (Trxn) from the db
   *
   * @param {string} client - the authorized party / client
   * @param {string} transactionId - the id of the transaction we want
   * @throws NotFoundError if transaction for client not found
   * @returns {object} Trxn object
   */
  async readTransaction(client, transactionId) {
    return Trxn.query()
      .findById(transactionId)
      .where('client', client)
      .withGraphJoined('messages.statusHistory')
      .orderBy('messages:statusHistory.createdAt', 'desc')
      .throwIfNotFound();
  }

  /**
   * @function updateMessageSendResult
   * Updates the message's send result field.
   * The send result is populated once the email has been sent.
   *
   * @param {string} client- the authorized party / client
   * @param {string} messageId - the id of the message
   * @param {object} sendResult - the pared down SMTP result
   * @throws NotFoundError if message for client not found
   * @returns {object} Message object, fully populated.
   */
  async updateMessageSendResult(client, messageId, sendResult) {
    let trx;
    try {
      trx = await Message.startTransaction();

      // first query for message, throw not found if client/message not exist...
      await this.readMessage(client, messageId, trx);

      const cItems = await Message.query(trx)
        .patch({ sendResult: sendResult })
        .where('messageId', messageId);
      log.info(`Updated ${cItems} message (result) records...`, { function: 'updateMessageSendResult' });

      await trx.commit();
      return this.readMessage(client, messageId);
    } catch (err) {
      log.error(`Error updating message send result record: ${err.message}. Rolling back...`, { error: err, function: 'updateMessageSendResult' });
      if (trx) await trx.rollback();
      throw err;
    }
  }

  /**
   * @function updateStatus
   * Updates the message's current status.
   * Adds a new queue processing status (Queue) record.
   * Determines the business status and if required, add a new business status (Status) record.
   *
   * @param {string} client - the authorized party / client
   * @param {string} messageId - the id of the message we want to purge email content
   * @param {string} status - the queue processing status
   * @param {string} description - optional description, mostly used for error/failure statuses
   * @throws NotFoundError if message for client not found
   * @returns {object} Message object
   */
  async updateStatus(client, messageId, status, description) {
    let trx;
    try {
      trx = await Message.startTransaction();

      // first query for message, throw not found if client/message not exist...
      const msg = await this.readMessage(client, messageId, trx);

      const businessStatus = queueToStatus(status);
      // Update business status if it has a description or is different than current state
      if (description || msg.status !== businessStatus) {
        // Only update message status if in appropriate states
        if (!msg.sendResult || msg.sendResult && businessStatus === statusState.COMPLETED) {
          await msg.$query(trx).patch({ status: businessStatus });
        }
        // Add a new status record
        await Status.query(trx).insert({
          messageId: messageId,
          status: businessStatus,
          description: description
        });
      }

      // Always add a new queue record
      await Queue.query(trx).insert({
        messageId: messageId,
        status: status,
        description: description
      });

      await trx.commit();
      return this.readMessage(client, messageId);
    } catch (err) {
      log.error(`Error updating message statuses record: ${err.message}. Rolling back...`, { error: err, function: 'updateStatus' });
      if (trx) await trx.rollback();
      throw err;
    }
  }
}

module.exports = DataService;